You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/08/21 20:04:19 UTC
svn commit: r687813 - in /incubator/qpid/trunk/qpid/cpp: src/
src/qpid/amqp_0_10/ src/qpid/broker/ src/qpid/cluster/ src/qpid/sys/
src/tests/ xml/
Author: aconway
Date: Thu Aug 21 11:04:18 2008
New Revision: 687813
URL: http://svn.apache.org/viewvc?rev=687813&view=rev
Log:
Pre-buffering output strategy for cluster.
Additional hooks in broker code, should not affect standalone broker.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/cluster.mk
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Aug 21 11:04:18 2008
@@ -546,6 +546,7 @@
qpid/sys/ConnectionInputHandler.h \
qpid/sys/ConnectionInputHandlerFactory.h \
qpid/sys/ConnectionOutputHandler.h \
+ qpid/sys/ConnectionOutputHandlerPtr.h \
qpid/sys/DeletionManager.h \
qpid/sys/Dispatcher.h \
qpid/sys/IOHandle.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Thu Aug 21 11:04:18 2008
@@ -19,7 +19,11 @@
qpid/cluster/ShadowConnectionOutputHandler.h \
qpid/cluster/PollableCondition.h \
qpid/cluster/PollableCondition.cpp \
- qpid/cluster/PollableQueue.h
+ qpid/cluster/PollableQueue.h \
+ qpid/cluster/WriteEstimate.h \
+ qpid/cluster/WriteEstimate.cpp \
+ qpid/cluster/OutputInterceptor.h \
+ qpid/cluster/OutputInterceptor.cpp
libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Thu Aug 21 11:04:18 2008
@@ -30,7 +30,7 @@
Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
: frameQueueClosed(false), output(o),
connection(new broker::Connection(this, broker, id, _isClient)),
- identifier(id), initialized(false), isClient(_isClient) {}
+ identifier(id), initialized(false), isClient(_isClient), buffered(0) {}
size_t Connection::decode(const char* buffer, size_t size) {
framing::Buffer in(const_cast<char*>(buffer), size);
@@ -53,7 +53,7 @@
bool Connection::canEncode() {
if (!frameQueueClosed) connection->doOutput();
- Mutex::ScopedLock l(frameQueueLock);
+ Mutex::ScopedLock l(frameQueueLock);
return (!isClient && !initialized) || !frameQueue.empty();
}
@@ -71,10 +71,12 @@
initialized = true;
QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")");
}
- while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) {
+ size_t frameSize=0;
+ while (!frameQueue.empty() && ((frameSize=frameQueue.front().size()) <= out.available())) {
frameQueue.front().encode(out);
QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front());
- frameQueue.pop();
+ frameQueue.pop_front();
+ buffered -= frameSize;
}
assert(frameQueue.empty() || frameQueue.front().size() <= size);
if (!frameQueue.empty() && frameQueue.front().size() > size)
@@ -98,7 +100,8 @@
{
Mutex::ScopedLock l(frameQueueLock);
if (!frameQueueClosed)
- frameQueue.push(f);
+ frameQueue.push_back(f);
+ buffered += f.size();
}
activateOutput();
}
@@ -107,4 +110,9 @@
return framing::ProtocolVersion(0,10);
}
+size_t Connection::getBuffered() const {
+ Mutex::ScopedLock l(frameQueueLock);
+ return buffered;
+}
+
}} // namespace qpid::amqp_0_10
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Thu Aug 21 11:04:18 2008
@@ -26,7 +26,7 @@
#include "qpid/sys/Mutex.h"
#include "qpid/broker/Connection.h"
#include <boost/intrusive_ptr.hpp>
-#include <queue>
+#include <deque>
#include <memory>
namespace qpid {
@@ -36,7 +36,9 @@
class Connection : public sys::ConnectionCodec,
public sys::ConnectionOutputHandler
{
- std::queue<framing::AMQFrame> frameQueue;
+ typedef std::deque<framing::AMQFrame> FrameQueue;
+
+ FrameQueue frameQueue;
bool frameQueueClosed;
mutable sys::Mutex frameQueueLock;
sys::OutputControl& output;
@@ -44,7 +46,8 @@
std::string identifier;
bool initialized;
bool isClient;
-
+ size_t buffered;
+
public:
Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false);
size_t decode(const char* buffer, size_t size);
@@ -56,6 +59,7 @@
void close(); // closing from this end.
void send(framing::AMQFrame&);
framing::ProtocolVersion getVersion() const;
+ size_t getBuffered() const;
};
}} // namespace qpid::amqp_0_10
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Thu Aug 21 11:04:18 2008
@@ -79,7 +79,7 @@
void Connection::requestIOProcessing(boost::function0<void> callback)
{
ioCallback = callback;
- out->activateOutput();
+ out.activateOutput();
}
Connection::~Connection()
@@ -178,7 +178,6 @@
try {
while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
- // FIXME aconway 2008-07-15: exclusive is per-session not per-connection in 0-10.
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
@@ -245,7 +244,7 @@
case management::Connection::METHOD_CLOSE :
mgmtClosing = true;
if (mgmtObject != 0) mgmtObject->set_closing(1);
- out->activateOutput();
+ out.activateOutput();
status = Manageable::STATUS_OK;
break;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h Thu Aug 21 11:04:18 2008
@@ -24,7 +24,7 @@
#include <vector>
#include "qpid/sys/AggregateOutput.h"
-#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/ConnectionOutputHandlerPtr.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/management/Manageable.h"
#include "Broker.h"
@@ -34,11 +34,14 @@
class ConnectionState : public ConnectionToken, public management::Manageable
{
+ protected:
+ sys::ConnectionOutputHandlerPtr out;
+
public:
- ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) :
+ ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) :
+ out(o),
broker(b),
- outputTasks(*o),
- out(o),
+ outputTasks(out),
framemax(65535),
heartbeat(0),
stagingThreshold(broker.getStagingThreshold())
@@ -67,14 +70,13 @@
//contained output tasks
sys::AggregateOutput outputTasks;
- sys::ConnectionOutputHandler& getOutput() const { return *out; }
+ sys::ConnectionOutputHandlerPtr& getOutput() { return out; }
framing::ProtocolVersion getVersion() const { return version; }
- void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out = o; }
+ void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out.set(o); }
protected:
framing::ProtocolVersion version;
- sys::ConnectionOutputHandler* out;
uint32_t framemax;
uint16_t heartbeat;
uint64_t stagingThreshold;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Aug 21 11:04:18 2008
@@ -25,12 +25,14 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/framing/ClusterConnectionCloseBody.h"
+#include "qpid/framing/ClusterConnectionDoOutputBody.h"
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
+#include <boost/current_function.hpp>
#include <algorithm>
#include <iterator>
#include <map>
@@ -76,11 +78,6 @@
cpg.join(name);
notify();
- // FIXME aconway 2008-08-11: can we remove this loop?
- // Dispatch till we show up in the cluster map.
- while (empty())
- cpg.dispatchOne();
-
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
deliverQueue.start(poller);
@@ -97,9 +94,8 @@
std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1));
}
-// local connection initializes plugins
void Cluster::initialize(broker::Connection& c) {
- bool isLocal = &c.getOutput() != &shadowOut;
+ bool isLocal = c.getOutput().get() != &shadowOut;
if (isLocal)
localConnectionSet.insert(new ConnectionInterceptor(c, *this));
}
@@ -107,10 +103,8 @@
void Cluster::leave() {
Mutex::ScopedLock l(lock);
if (!broker) return; // Already left.
- // At this point the poller has already been shut down so
- // no dispatches can occur thru the cpgDispatchHandle.
- //
- // FIXME aconway 2008-08-11: assert this is the cae.
+ // Leave is called by from Broker destructor after the poller has
+ // been shut down. No dispatches can occur.
QPID_LOG(debug, "Leaving cluster " << *this);
cpg.leave(name);
@@ -173,13 +167,6 @@
return result;
}
-// ################ HERE - leaking shadow connections.
-// FIXME aconway 2008-08-11: revisit memory management for shadow
-// connections, what if the Connection is closed other than via
-// disconnect? Dangling pointer in shadow map. Use ptr_map for shadow
-// map, add deleted state to ConnectionInterceptor? Interceptors need
-// to know about map? Check how Connections can be deleted.
-
ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) {
ShadowConnectionId id(member, remotePtr);
ShadowConnectionMap::iterator i = shadowConnectionMap.find(id);
@@ -274,7 +261,8 @@
break;
}
case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: {
- connection->deliverDoOutput();
+ ClusterConnectionDoOutputBody& doOutput = static_cast<ClusterConnectionDoOutputBody&>(method);
+ connection->deliverDoOutput(doOutput.getBytes());
break;
}
default:
@@ -309,9 +297,8 @@
void Cluster::disconnect(sys::DispatchHandle& h) {
h.stopWatch();
- // FIXME aconway 2008-08-11: error handling if we are disconnected.
- // Kill the broker?
- assert(0);
+ QPID_LOG(critical, "Disconnected from cluster, shutting down");
+ broker.shutdown();
}
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp Thu Aug 21 11:04:18 2008
@@ -22,6 +22,8 @@
#include "qpid/framing/ClusterConnectionCloseBody.h"
#include "qpid/framing/ClusterConnectionDoOutputBody.h"
#include "qpid/framing/AMQFrame.h"
+#include <boost/current_function.hpp>
+
namespace qpid {
namespace cluster {
@@ -32,24 +34,27 @@
ConnectionInterceptor::ConnectionInterceptor(
broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_)
- : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_)
+ : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_), output(*this, *conn.getOutput().get())
{
connection->addFinalizer(boost::bind(operator delete, this));
+ connection->setOutputHandler(&output),
// Attach my functions to Connection extension points.
shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1));
shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this));
- shift(doOutputNext, connection->doOutputFn, boost::bind(&ConnectionInterceptor::doOutput, this));
+ shift(output.doOutputNext, connection->doOutputFn, boost::bind(&OutputInterceptor::doOutput, &output));
}
ConnectionInterceptor::~ConnectionInterceptor() {
assert(connection == 0);
}
+// Forward all received frames to the cluster, continue handling on delivery.
void ConnectionInterceptor::received(framing::AMQFrame& f) {
if (isClosed) return;
cluster.send(f, this);
}
+// Continue normal handling of delivered frames.
void ConnectionInterceptor::deliver(framing::AMQFrame& f) {
receivedNext(f);
}
@@ -81,28 +86,17 @@
}
void ConnectionInterceptor::dirtyClose() {
- // Not closed via cluster self-delivery but closed locally.
- // Used for dirty cluster shutdown where active connections
- // must be cleaned up.
+ // Not closed via cluster self-delivery but closed locally. Used
+ // when local broker is shut down without a clean cluster shutdown.
+ // Release the connection, it will delete this.
connection = 0;
}
-bool ConnectionInterceptor::doOutput() {
- // FIXME aconway 2008-08-15: this is not correct.
- // Run in write threads so order of execution of doOutput is not determinate.
- // Will only work reliably for in single-consumer tests.
-
- if (connection->hasOutput()) {
- cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this);
- return doOutputNext();
- }
- return false;
-}
-
-void ConnectionInterceptor::deliverDoOutput() {
- // FIXME aconway 2008-08-15: see comment in doOutput.
- if (isShadow())
- doOutputNext();
+// Delivery of doOutput allows us to run the real connection doOutput()
+// which stocks up the write buffers with data.
+//
+void ConnectionInterceptor::deliverDoOutput(size_t requested) {
+ output.deliverDoOutput(requested);
}
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h Thu Aug 21 11:04:18 2008
@@ -1,5 +1,5 @@
-#ifndef QPID_CLUSTER_CONNECTIONPLUGIN_H
-#define QPID_CLUSTER_CONNECTIONPLUGIN_H
+#ifndef QPID_CLUSTER_CONNECTIONINTERCEPTOR_H
+#define QPID_CLUSTER_CONNECTIONINTERCEPTOR_H
/*
*
@@ -23,6 +23,8 @@
*/
#include "Cluster.h"
+#include "WriteEstimate.h"
+#include "OutputInterceptor.h"
#include "qpid/broker/Connection.h"
#include "qpid/sys/ConnectionOutputHandler.h"
@@ -41,42 +43,46 @@
Cluster::ShadowConnectionId getShadowId() const { return shadowId; }
- bool isLocal() const { return shadowId == Cluster::ShadowConnectionId(0,0); }
-
+ bool isShadow() const { return shadowId != Cluster::ShadowConnectionId(0,0); }
+ bool isLocal() const { return !isShadow(); }
+ bool getClosed() const { return isClosed; }
+
// self-delivery of intercepted extension points.
void deliver(framing::AMQFrame& f);
void deliverClosed();
- void deliverDoOutput();
+ void deliverDoOutput(size_t requested);
void dirtyClose();
+ Cluster& getCluster() { return cluster; }
+
private:
struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler {
void close() {}
void send(framing::AMQFrame&) {}
- void doOutput() {}
void activateOutput() {}
};
- bool isShadow() { return shadowId != Cluster::ShadowConnectionId(0,0); }
-
// Functions to intercept to Connection extension points.
void received(framing::AMQFrame&);
void closed();
bool doOutput();
+ void activateOutput();
+
+ void sendDoOutput();
boost::function<void (framing::AMQFrame&)> receivedNext;
boost::function<void ()> closedNext;
- boost::function<bool ()> doOutputNext;
boost::intrusive_ptr<broker::Connection> connection;
Cluster& cluster;
NullConnectionHandler discardHandler;
bool isClosed;
Cluster::ShadowConnectionId shadowId;
+ WriteEstimate writeEstimate;
+ OutputInterceptor output;
};
}} // namespace qpid::cluster
-#endif /*!QPID_CLUSTER_CONNECTIONPLUGIN_H*/
-
+#endif /*!QPID_CLUSTER_CONNECTIONINTERCEPTOR_H*/
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=687813&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu Aug 21 11:04:18 2008
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "OutputInterceptor.h"
+#include "ConnectionInterceptor.h"
+#include "qpid/framing/ClusterConnectionDoOutputBody.h"
+#include "qpid/framing/AMQFrame.h"
+#include <boost/current_function.hpp>
+
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+OutputInterceptor::OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h)
+ : parent(p), next(h), sent(), moreOutput(), doingOutput()
+{}
+
+void OutputInterceptor::send(framing::AMQFrame& f) {
+ Locker l(lock);
+ next.send(f);
+ sent += f.size();
+}
+
+void OutputInterceptor::activateOutput() {
+ Locker l(lock);
+ moreOutput = true;
+ sendDoOutput();
+}
+
+// Called in write thread when the IO layer has no more data to write.
+// We do nothing in the write thread, we run doOutput only on delivery
+// of doOutput requests.
+bool OutputInterceptor::doOutput() {
+ return false;
+}
+
+// Delivery of doOutput allows us to run the real connection doOutput()
+// which stocks up the write buffers with data.
+//
+void OutputInterceptor::deliverDoOutput(size_t requested) {
+ if (parent.getClosed()) return;
+
+ Locker l(lock);
+ size_t buf = next.getBuffered();
+ if (parent.isLocal())
+ writeEstimate.delivered(sent, buf); // Update the estimate.
+
+ // Run the real doOutput() till we have added the requested data or there's nothing to output.
+ sent = 0;
+ do {
+ sys::Mutex::ScopedUnlock u(lock);
+ moreOutput = doOutputNext(); // Calls send()
+ } while (sent < requested && moreOutput);
+ sent += buf; // Include buffered data in the sent total.
+
+ QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " output=" << sent << " more=" << moreOutput);
+
+ if (parent.isLocal() && moreOutput)
+ sendDoOutput();
+ else
+ doingOutput = false;
+}
+
+void OutputInterceptor::startDoOutput() {
+ if (!doingOutput)
+ sendDoOutput();
+}
+
+// Send a doOutput request if one is not already in flight.
+void OutputInterceptor::sendDoOutput() {
+ // Call with lock held.
+ if (parent.isShadow() || parent.getClosed())
+ return;
+
+ doingOutput = true;
+ size_t request = writeEstimate.sending(getBuffered());
+
+ // Note we may send 0 size request if there's more than 2*estimate in the buffer.
+ // Send it anyway to keep the doOutput chain going until we are sure there's no more output
+ // (in deliverDoOutput)
+ //
+ parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDoOutputBody>(
+ framing::ProtocolVersion(), request)), &parent);
+ QPID_LOG(trace, &parent << "Send doOutput request for " << request);
+}
+
+}} // namespace qpid::cluster
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h?rev=687813&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h Thu Aug 21 11:04:18 2008
@@ -0,0 +1,74 @@
+#ifndef QPID_CLUSTER_OUTPUTINTERCEPTOR_H
+#define QPID_CLUSTER_OUTPUTINTERCEPTOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "WriteEstimate.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/broker/ConnectionFactory.h"
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace framing { class AMQFrame; }
+namespace cluster {
+
+class ConnectionInterceptor;
+
+/**
+ * Interceptor for connection OutputHandler, manages outgoing message replication.
+ */
+class OutputInterceptor : public sys::ConnectionOutputHandler {
+ public:
+ OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h);
+
+ // sys::ConnectionOutputHandler functions
+ void send(framing::AMQFrame& f);
+ void activateOutput();
+ void close() { Locker l(lock); next.close(); }
+ size_t getBuffered() const { Locker l(lock); return next.getBuffered(); }
+
+ // Delivery point for doOutput requests.
+ void deliverDoOutput(size_t requested);
+ // Intercept doOutput requests on Connection.
+ bool doOutput();
+
+ boost::function<bool ()> doOutputNext;
+
+ ConnectionInterceptor& parent;
+
+ private:
+ typedef sys::Mutex::ScopedLock Locker;
+
+ void startDoOutput();
+ void sendDoOutput();
+
+ mutable sys::Mutex lock;
+ sys::ConnectionOutputHandler& next;
+ size_t sent;
+ WriteEstimate writeEstimate;
+ bool moreOutput;
+ bool doingOutput;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_OUTPUTINTERCEPTOR_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp?rev=687813&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp Thu Aug 21 11:04:18 2008
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "WriteEstimate.h"
+#include "qpid/log/Statement.h"
+#include <boost/current_function.hpp>
+
+namespace qpid {
+namespace cluster {
+
+WriteEstimate::WriteEstimate(size_t initial)
+ : growing(true), estimate(initial) {}
+
+size_t WriteEstimate::sending(size_t buffered) {
+ // We want to send a doOutput request for enough data such
+ // that if estimate bytes are written before it is self
+ // delivered then what is left in the buffer plus the doOutput
+ // request will be estimate bytes.
+
+ size_t predictLeft = (buffered > estimate) ? buffered - estimate : 0;
+ size_t request = (estimate > predictLeft) ? estimate - predictLeft : 0;
+ return request;
+}
+
+size_t pad(size_t value) { return value + value/2; }
+
+void WriteEstimate::delivered(size_t sent, size_t buffered) {
+ size_t wrote = sent > buffered ? sent - buffered : 0;
+ if (wrote == 0) // No change
+ return;
+ if (buffered > 0) { // Buffer was over-stocked, we wrote to capacity.
+ growing = false;
+ estimate = pad(wrote); // Estimate at 1.5 write for padding.
+ }
+ else if (wrote > estimate) { // Wrote everything, buffer was under-stocked
+ if (growing)
+ estimate = std::max(estimate*2, pad(wrote)); // Grow quickly if we have not yet seen an over-stock.
+ else
+ estimate = pad(wrote);
+ }
+}
+
+}} // namespace qpid::cluster
+
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h?rev=687813&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h Thu Aug 21 11:04:18 2008
@@ -0,0 +1,64 @@
+#ifndef QPID_CLUSTER_WRITEESTIMATE_H
+#define QPID_CLUSTER_WRITEESTIMATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Estimate the amount of data that a connection can write between sending
+ * a doOutput notice and re-receiving it.
+ *
+ * The goal is to avoid ever write-idling the connection by sending
+ * the next doOutput request as soon as we've processed the previous
+ * one, such that data generated by the previous request will keep the
+ * writer busy till the next one is delivered.
+ *
+ */
+class WriteEstimate
+{
+ public:
+ WriteEstimate(size_t initial=4096);
+
+ /** About to send a doOutput request.
+ * Update estimation state and return size for next request.
+ */
+ size_t sending(size_t buffered);
+
+ /**
+ * doOutput request just delivered, not yet executed. Update the estimate.
+ * and estimate how much data to request in the next onOutput
+ * request. 0 means don't send an onOutput request.
+ */
+ void delivered(size_t sent, size_t buffered);
+
+ private:
+ bool growing;
+ size_t estimate;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_WRITEESTIMATE_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h Thu Aug 21 11:04:18 2008
@@ -34,6 +34,7 @@
{
public:
virtual void close() = 0;
+ virtual size_t getBuffered() const { return 0; }
};
}}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h?rev=687813&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h Thu Aug 21 11:04:18 2008
@@ -0,0 +1,54 @@
+#ifndef QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H
+#define QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionOutputHandler.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A ConnectionOutputHandler that delegates to another
+ * ConnectionOutputHandler. Allows the "real" ConnectionOutputHandler
+ * to be changed modified without updating all the pointers/references
+ * using the ConnectionOutputHandlerPtr
+ */
+class ConnectionOutputHandlerPtr : public ConnectionOutputHandler
+{
+ public:
+ ConnectionOutputHandlerPtr(ConnectionOutputHandler* p) : next(p) {}
+ void set(ConnectionOutputHandler* p) { next = p; }
+ ConnectionOutputHandler* get() { return next; }
+ const ConnectionOutputHandler* get() const { return next; }
+
+ void close() { next->close(); }
+ size_t getBuffered() const { return next->getBuffered(); }
+ void activateOutput() { next->activateOutput(); }
+ void send(framing::AMQFrame& f) { next->send(f); }
+
+ private:
+ ConnectionOutputHandler* next;
+};
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Aug 21 11:04:18 2008
@@ -78,7 +78,7 @@
::sleep(1);
--retry;
}
- BOOST_CHECK_EQUAL(n, getGlobalCluster()->size());
+ BOOST_REQUIRE_EQUAL(n, getGlobalCluster()->size());
}
void ClusterFixture::add() {
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp Thu Aug 21 11:04:18 2008
@@ -42,13 +42,15 @@
uint count;
uint ack;
string queue;
-
+ bool declare;
+
Args() : count(0), ack(1)
{
addOptions()
("count", optValue(count, "N"), "number of messages to publish")
("ack-frequency", optValue(ack, "N"), "ack every N messages (0 means use no-ack mode)")
- ("queue", optValue(queue, "<queue name>"), "queue to consume from");
+ ("queue", optValue(queue, "<queue name>"), "queue to consume from")
+ ("declare", optValue(declare), "declare the queue");
}
};
@@ -67,7 +69,8 @@
void consume()
{
-
+ if (opts.declare)
+ session.queueDeclare(opts.queue);
SubscriptionManager subs(session);
LocalQueue lq(AckPolicy(opts.ack));
subs.setAcceptMode(opts.ack > 0 ? 0 : 1);
@@ -77,7 +80,7 @@
Message msg;
for (size_t i = 0; i < opts.count; ++i) {
msg=lq.pop();
- std::cout << "Received: " << msg.getMessageProperties().getCorrelationId() << std::endl;
+ QPID_LOG(info, "Received: " << msg.getMessageProperties().getCorrelationId());
}
if (opts.ack != 0)
subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp Thu Aug 21 11:04:18 2008
@@ -44,6 +44,7 @@
uint size;
uint count;
uint rate;
+ bool sync;
uint reportFrequency;
uint timeLimit;
uint queues;
@@ -65,6 +66,7 @@
("queues", optValue(queues, "N"), "number of queues")
("count", optValue(count, "N"), "number of messages to send")
("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)")
+ ("sync", optValue(sync), "send messages synchronously")
("report-frequency", optValue(reportFrequency, "N"),
"number of milliseconds to wait between reports (ignored unless rate specified)")
("time-limit", optValue(timeLimit, "N"),
@@ -143,6 +145,7 @@
void sendByRate();
void sendByCount();
Receiver& receiver;
+ const string data;
public:
Sender(const string& queue, Receiver& receiver);
void test();
@@ -285,7 +288,7 @@
totalLatency = maxLatency = minLatency = 0;
}
-Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver) {}
+Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver), data(generateData(opts.size)) {}
void Sender::test()
{
@@ -295,7 +298,7 @@
void Sender::sendByCount()
{
- Message msg(generateData(opts.size), queue);
+ Message msg(data, queue);
if (opts.durable) {
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
}
@@ -303,15 +306,15 @@
for (uint i = 0; i < opts.count; i++) {
uint64_t sentAt(current_time());
msg.getDeliveryProperties().setTimestamp(sentAt);
- //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
+ if (opts.sync) session.sync();
}
session.sync();
}
void Sender::sendByRate()
{
- Message msg(generateData(opts.size), queue);
+ Message msg(data, queue);
if (opts.durable) {
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
}
@@ -324,9 +327,9 @@
while (true) {
uint64_t start_msg(current_time());
msg.getDeliveryProperties().setTimestamp(start_msg);
- //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
-
+ if (opts.sync) session.sync();
+
uint64_t now = current_time();
if (timeLimit != 0 && (now - start) > timeLimit) {
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster Thu Aug 21 11:04:18 2008
@@ -16,7 +16,7 @@
if test "$SIZE" = "one"; then # Special case of singleton cluster, use default port.
../qpidd -q
- with_ais_group ../qpidd $OPTS || exit 1
+ with_ais_group ../qpidd $OPTS --log-output=cluster.log || exit 1
else
for (( i=0; i<SIZE; ++i )); do
PORT=`with_ais_group ../qpidd -p0 --log-output=cluster$i.log $OPTS` || exit 1
Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Thu Aug 21 11:04:18 2008
@@ -26,12 +26,18 @@
<doc>Qpid extension class to allow clustered brokers to communicate.</doc>
<control name = "notify" code="0x1">
+ <role name="server" implement="MUST" />
<field name="url" type="str16" />
</control>
- <control name="connection-close" code="0x2"/>
+ <control name="connection-close" code="0x2">
+ <role name="server" implement="MUST" />
+ </control>
- <control name="connection-do-output" code="0x3"/>
+ <control name="connection-do-output" code="0x3">
+ <role name="server" implement="MUST" />
+ <field name="bytes" type="uint32"/>
+ </control>
</class>
</amqp>