You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/11 13:11:49 UTC
svn commit: r620468 [2/4] - in /incubator/qpid/branches/thegreatmerge: ./
qpid/bin/ qpid/cpp/ qpid/cpp/examples/ qpid/cpp/examples/examples/direct/
qpid/cpp/examples/examples/fanout/ qpid/cpp/examples/examples/pub-sub/
qpid/cpp/examples/examples/reques...
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/ConnectionHandler.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/ConnectionHandler.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/ConnectionHandler.h Mon Feb 11 04:11:03 2008
@@ -24,8 +24,10 @@
#include <memory>
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/ProtocolVersion.h"
@@ -39,10 +41,13 @@
// TODO aconway 2007-09-18: Rename to ConnectionHandler
class ConnectionHandler : public framing::FrameHandler
{
- struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler
+ struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler,
+ public framing::AMQP_ClientOperations::ConnectionHandler
{
framing::AMQP_ClientProxy::Connection client;
+ framing::AMQP_ServerProxy::Connection server;
Connection& connection;
+ bool serverMode;
Handler(Connection& connection);
void startOk(const qpid::framing::FieldTable& clientProperties,
@@ -55,6 +60,23 @@
void close(uint16_t replyCode, const std::string& replyText,
uint16_t classId, uint16_t methodId);
void closeOk();
+
+
+ void start(uint8_t versionMajor,
+ uint8_t versionMinor,
+ const qpid::framing::FieldTable& serverProperties,
+ const std::string& mechanisms,
+ const std::string& locales);
+
+ void secure(const std::string& challenge);
+
+ void tune(uint16_t channelMax,
+ uint32_t frameMax,
+ uint16_t heartbeat);
+
+ void openOk(const std::string& knownHosts);
+
+ void redirect(const std::string& host, const std::string& knownHosts);
};
std::auto_ptr<Handler> handler;
public:
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.cpp Mon Feb 11 04:11:03 2008
@@ -23,6 +23,7 @@
#include "Connection.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
+#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
@@ -57,17 +58,19 @@
//
AMQMethodBody* m = f.getBody()->getMethod();
try {
- if (m && invoke(*this, *m))
+ if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
return;
- else if (session.get()) {
+ } else if (session.get()) {
boost::optional<SequenceNumber> ack=session->received(f);
session->in.handle(f);
if (ack)
peerSession.ack(*ack, SequenceNumberSet());
- }
- else if (!ignoring)
+ } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
+ return;
+ } else if (!ignoring) {
throw ChannelErrorException(
QPID_MSG("Channel " << channel.get() << " is not open"));
+ }
} catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
session->detach();
@@ -186,6 +189,19 @@
void SessionHandler::solicitAck() {
assertAttached("solicit-ack");
peerSession.ack(session->sendingAck(), SequenceNumberSet());
+}
+
+void SessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
+{
+ std::auto_ptr<SessionState> state(
+ connection.broker.getSessionManager().open(*this, detachedLifetime));
+ session.reset(state.release());
+}
+
+void SessionHandler::detached()
+{
+ connection.broker.getSessionManager().suspend(session);
+ session.reset();
}
}} // namespace qpid::broker
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.h Mon Feb 11 04:11:03 2008
@@ -23,6 +23,7 @@
*/
#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
@@ -43,6 +44,7 @@
*/
class SessionHandler : public framing::FrameHandler::InOutHandler,
public framing::AMQP_ServerOperations::SessionHandler,
+ public framing::AMQP_ClientOperations::SessionHandler,
private boost::noncopyable
{
public:
@@ -81,11 +83,16 @@
const framing::SequenceNumberSet& seenFrameSet);
void highWaterMark(uint32_t lastSentMark);
void solicitAck();
+
+ //extra methods required for assuming client role
+ void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
+ void detached();
void assertAttached(const char* method) const;
void assertActive(const char* method) const;
void assertClosed(const char* method) const;
+
Connection& connection;
framing::ChannelHandler channel;
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.cpp Mon Feb 11 04:11:03 2008
@@ -43,13 +43,16 @@
SessionManager::~SessionManager() {}
+// FIXME aconway 2008-02-01: pass handler*, allow open unattached.
std::auto_ptr<SessionState> SessionManager::open(
SessionHandler& h, uint32_t timeout_)
{
Mutex::ScopedLock l(lock);
std::auto_ptr<SessionState> session(
- new SessionState(*this, h, timeout_, ack));
+ new SessionState(this, &h, timeout_, ack));
active.insert(session->getId());
+ for_each(observers.begin(), observers.end(),
+ boost::bind(&Observer::opened, _1,boost::ref(*session)));
return session;
}
@@ -100,6 +103,10 @@
suspended.erase(suspended.begin(), keep);
}
}
+}
+
+void SessionManager::add(const intrusive_ptr<Observer>& o) {
+ observers.push_back(o);
}
}} // namespace qpid::broker
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.h Mon Feb 11 04:11:03 2008
@@ -25,6 +25,7 @@
#include <qpid/framing/Uuid.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/Mutex.h>
+#include <qpid/RefCounted.h>
#include <boost/noncopyable.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
@@ -44,8 +45,17 @@
*/
class SessionManager : private boost::noncopyable {
public:
+ /**
+ * Observer notified of SessionManager events.
+ */
+ struct Observer : public RefCounted {
+ virtual void opened(SessionState&) {}
+ };
+
SessionManager(uint32_t ack);
+
~SessionManager();
+
/** Open a new active session, caller takes ownership */
std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_);
@@ -59,9 +69,13 @@
*/
std::auto_ptr<SessionState> resume(const framing::Uuid&);
+ /** Add an Observer. */
+ void add(const intrusive_ptr<Observer>&);
+
private:
typedef boost::ptr_vector<SessionState> Suspended;
typedef std::set<framing::Uuid> Active;
+ typedef std::vector<intrusive_ptr<Observer> > Observers;
void erase(const framing::Uuid&);
void eraseExpired();
@@ -70,6 +84,7 @@
Suspended suspended;
Active active;
uint32_t ack;
+ Observers observers;
friend class SessionState; // removes deleted sessions from active set.
};
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Feb 11 04:11:03 2008
@@ -36,23 +36,17 @@
using qpid::management::Manageable;
using qpid::management::Args;
-void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); }
-
-void SessionState::handleOut(AMQFrame& f) {
- assert(handler);
- handler->out.handle(f);
-}
-
SessionState::SessionState(
- SessionManager& f, SessionHandler& h, uint32_t timeout_, uint32_t ack)
+ SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack)
: framing::SessionState(ack, timeout_ > 0),
- factory(f), handler(&h), id(true), timeout(timeout_),
- broker(h.getConnection().broker),
- version(h.getConnection().getVersion()),
+ factory(f), handler(h), id(true), timeout(timeout_),
+ broker(h->getConnection().broker),
+ version(h->getConnection().getVersion()),
semanticHandler(new SemanticHandler(*this))
{
- // TODO aconway 2007-09-20: SessionManager may add plugin
- // handlers to the chain.
+ in.next = semanticHandler.get();
+ out.next = &handler->out;
+
getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
Manageable* parent = broker.GetVhostObject ();
@@ -66,8 +60,8 @@
mgmtObject = management::Session::shared_ptr
(new management::Session (this, parent, id.str ()));
mgmtObject->set_attached (1);
- mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
- mgmtObject->set_channelId (h.getChannel());
+ mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_channelId (h->getChannel());
mgmtObject->set_detachedLifespan (getTimeout());
agent->addObject (mgmtObject);
}
@@ -76,12 +70,10 @@
SessionState::~SessionState() {
// Remove ID from active session list.
- factory.erase(getId());
-
+ if (factory)
+ factory->erase(getId());
if (mgmtObject.get () != 0)
- {
mgmtObject->resourceDestroy ();
- }
}
SessionHandler* SessionState::getHandler() {
@@ -101,7 +93,7 @@
void SessionState::detach() {
getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
Mutex::ScopedLock l(lock);
- handler = 0;
+ handler = 0; out.next = 0;
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (0);
@@ -112,6 +104,7 @@
{
Mutex::ScopedLock l(lock);
handler = &h;
+ out.next = &handler->out;
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (1);
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.h Mon Feb 11 04:11:03 2008
@@ -58,7 +58,7 @@
* themselves have state.
*/
class SessionState : public framing::SessionState,
- public framing::FrameHandler::InOutHandler,
+ public framing::FrameHandler::Chains,
public sys::OutputControl,
public management::Manageable
{
@@ -90,18 +90,15 @@
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
- protected:
- void handleIn(framing::AMQFrame&);
- void handleOut(framing::AMQFrame&);
-
- private:
- // SessionManager creates sessions.
- SessionState(SessionManager&,
- SessionHandler& out,
+ // Normally SessionManager creates sessions.
+ SessionState(SessionManager*,
+ SessionHandler* out,
uint32_t timeout,
uint32_t ackInterval);
- SessionManager& factory;
+
+ private:
+ SessionManager* factory;
SessionHandler* handler;
framing::Uuid id;
uint32_t timeout;
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.cpp Mon Feb 11 04:11:03 2008
@@ -85,17 +85,13 @@
void Timer::stop()
{
- signalStop();
- runner.join();
-}
-
-void Timer::signalStop()
-{
- Monitor::ScopedLock l(monitor);
- if (active) {
+ {
+ Monitor::ScopedLock l(monitor);
+ if (!active) return;
active = false;
monitor.notifyAll();
}
+ runner.join();
}
bool Later::operator()(const intrusive_ptr<TimerTask>& a,
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.h Mon Feb 11 04:11:03 2008
@@ -59,7 +59,6 @@
bool active;
virtual void run();
- void signalStop();
public:
Timer();
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.cpp Mon Feb 11 04:11:03 2008
@@ -29,12 +29,15 @@
#include "qpid/sys/Poller.h"
#include "qpid/Msg.h"
#include <boost/bind.hpp>
+#include <boost/format.hpp>
namespace qpid {
namespace client {
using namespace qpid::sys;
using namespace qpid::framing;
+using boost::format;
+using boost::str;
Connector::Connector(
ProtocolVersion ver, bool _debug, uint32_t buffer_size
@@ -59,6 +62,7 @@
Mutex::ScopedLock l(closedLock);
assert(closed);
socket.connect(host, port);
+ identifier=str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
closed = false;
poller = Poller::shared_ptr(new Poller);
aio = new AsynchIO(socket,
@@ -174,7 +178,9 @@
~Buff() { delete [] bytes;}
};
-Connector::Writer::Writer() : aio(0), buffer(0), lastEof(0) {}
+Connector::Writer::Writer() : aio(0), buffer(0), lastEof(0)
+{
+}
Connector::Writer::~Writer() { delete buffer; }
@@ -182,6 +188,7 @@
Mutex::ScopedLock l(lock);
aio = a;
newBuffer(l);
+ identifier = str(format("[%1% %2%]") % aio->getSocket().getLocalPort() % aio->getSocket().getPeerAddress());
}
void Connector::Writer::handle(framing::AMQFrame& frame) {
@@ -191,7 +198,7 @@
lastEof = frames.size();
aio->notifyPendingWrite();
}
- QPID_LOG(trace, "SENT [" << this << "]: " << frame);
+ QPID_LOG(trace, "SENT " << identifier << ": " << frame);
}
void Connector::Writer::writeOne(const Mutex::ScopedLock& l) {
@@ -234,7 +241,7 @@
AMQFrame frame;
while(frame.decode(in)){
- QPID_LOG(trace, "RECV [" << this << "]: " << frame);
+ QPID_LOG(trace, "RECV " << identifier << ": " << frame);
input->received(frame);
}
// TODO: unreading needs to go away, and when we can cope
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.h Mon Feb 11 04:11:03 2008
@@ -59,6 +59,7 @@
size_t lastEof; // Position after last EOF in frames
framing::Buffer encode;
size_t framesEncoded;
+ std::string identifier;
void writeOne(const sys::Mutex::ScopedLock&);
void newBuffer(const sys::Mutex::ScopedLock&);
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.cpp Mon Feb 11 04:11:03 2008
@@ -47,11 +47,18 @@
void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; }
-bool LocalQueue::empty()
+bool LocalQueue::empty() const
{
if (!queue)
throw ClosedException();
- return queue->isEmpty();
+ return queue->empty();
+}
+
+size_t LocalQueue::size() const
+{
+ if (!queue)
+ throw ClosedException();
+ return queue->size();
}
}} // namespace qpid::client
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.h Mon Feb 11 04:11:03 2008
@@ -44,8 +44,8 @@
*@exception ClosedException if subscription has been closed.
*/
Message pop();
- bool empty();
-
+ bool empty() const;
+ size_t size() const;
void setAckPolicy(AckPolicy);
private:
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Feb 11 04:11:03 2008
@@ -17,10 +17,12 @@
*/
#include "Cluster.h"
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
+#include <boost/scoped_array.hpp>
#include <algorithm>
#include <iterator>
#include <map>
@@ -30,7 +32,70 @@
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
+using broker::SessionState;
+namespace {
+
+// Beginning of inbound chain: send to cluster.
+struct ClusterSendHandler : public FrameHandler {
+ SessionState& session;
+ Cluster& cluster;
+ bool busy;
+ Monitor lock;
+
+ ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
+
+ void handle(AMQFrame& f) {
+ Mutex::ScopedLock l(lock);
+ assert(!busy);
+ // FIXME aconway 2008-01-29: refcount Sessions.
+ // session.addRef(); // Keep the session till the message is self delivered.
+ cluster.send(f, next); // Indirectly send to next via cluster.
+
+ // FIXME aconway 2008-01-29: need to get this blocking out of the loop.
+ // But cluster needs to agree on order of side-effects on the shared model.
+ // OK for wiring to block, for messages use queue tokens?
+ // Both in & out transfers must be orderd per queue.
+ // May need out-of-order completion.
+ busy=true;
+ while (busy) lock.wait();
+ }
+};
+
+// Next in inbound chain, self delivered from cluster.
+struct ClusterDeliverHandler : public FrameHandler {
+ Cluster& cluster;
+ ClusterSendHandler& sender;
+
+ ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {}
+
+ void handle(AMQFrame& f) {
+ next->handle(f);
+ Mutex::ScopedLock l(sender.lock);
+ sender.busy=false;
+ sender.lock.notify();
+ }
+};
+
+// FIXME aconway 2008-01-29: IList
+void insert(FrameHandler::Chain& c, FrameHandler* h) {
+ h->next = c.next;
+ c.next = h;
+}
+
+struct SessionObserver : public broker::SessionManager::Observer {
+ Cluster& cluster;
+ SessionObserver(Cluster& c) : cluster(c) {}
+
+ void opened(SessionState& s) {
+ // FIXME aconway 2008-01-29: IList for memory management.
+ ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
+ ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
+ insert(s.in, deliverer);
+ insert(s.in, sender);
+ }
+};
+}
ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
@@ -46,13 +111,11 @@
return out;
}
-Cluster::Cluster(const std::string& name_, const std::string& url_, broker::Broker& broker) :
- FrameHandler(&sessions),
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) :
cpg(*this),
name(name_),
- url(url_),
- self(Id::self(cpg)),
- sessions(broker, *this)
+ url(url_),
+ observer(new SessionObserver(*this))
{
QPID_LOG(trace, *this << " Joining cluster: " << name_);
cpg.join(name);
@@ -78,18 +141,19 @@
}
}
-void Cluster::handle(AMQFrame& frame) {
+void Cluster::send(AMQFrame& frame, FrameHandler* next) {
QPID_LOG(trace, *this << " SEND: " << frame);
- Buffer buf(frame.size());
+ char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling.
+ Buffer buf(data);
frame.encode(buf);
- buf.flip();
- iovec iov = { buf.start(), frame.size() };
+ buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer.
+ iovec iov = { data, frame.size()+sizeof(next) };
cpg.mcast(name, &iov, 1);
}
void Cluster::notify() {
- AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url));
- handle(frame);
+ AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str()));
+ send(frame, 0);
}
size_t Cluster::size() const {
@@ -113,15 +177,25 @@
void* msg,
int msg_len)
{
- Id from(nodeid, pid);
- Buffer buf(static_cast<char*>(msg), msg_len);
- AMQFrame frame;
- frame.decode(buf);
- QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
- if (frame.getChannel() == 0)
- handleClusterFrame(from, frame);
- else
- next->handle(frame);
+ try {
+ Id from(nodeid, pid);
+ Buffer buf(static_cast<char*>(msg), msg_len);
+ AMQFrame frame;
+ frame.decode(buf);
+ QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
+ if (frame.getChannel() == 0)
+ handleClusterFrame(from, frame);
+ else if (from == self) {
+ FrameHandler* next;
+ buf.getRawData((uint8_t*)&next, sizeof(next));
+ next->handle(frame);
+ }
+ // FIXME aconway 2008-01-30: apply frames from foreign sessions.
+ }
+ catch (const std::exception& e) {
+ // FIXME aconway 2008-01-30: exception handling.
+ QPID_LOG(error, "Error handling frame from cluster " << e.what());
+ }
}
bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
@@ -144,6 +218,8 @@
{
Mutex::ScopedLock l(lock);
members[from].url=notifyIn->getUrl();
+ if (!self.id && notifyIn->getUrl() == url.str())
+ self=from;
lock.notifyAll();
QPID_LOG(trace, *this << ": members joined: " << members);
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.h Mon Feb 11 04:11:03 2008
@@ -19,13 +19,15 @@
*
*/
-#include "SessionManager.h"
#include "Cpg.h"
+#include "qpid/broker/Broker.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "qpid/log/Logger.h"
+#include "qpid/Url.h"
+
#include <boost/optional.hpp>
#include <boost/function.hpp>
@@ -36,21 +38,16 @@
namespace qpid { namespace cluster {
/**
- * Connection to the cluster. Maintains cluster membership
- * data.
- *
- * As FrameHandler, handles frames by sending them to the
- * cluster. Frames received from the cluster are sent to the next
- * FrameHandler in the chain.
+ * Connection to the cluster.
+ * Keeps cluster membership data.
*/
-class Cluster : public framing::FrameHandler,
- private sys::Runnable, private Cpg::Handler
+class Cluster : private sys::Runnable, private Cpg::Handler
{
public:
/** Details of a cluster member */
struct Member {
- Member(const std::string& url_=std::string()) : url(url_) {}
- std::string url; ///< Broker address.
+ Member(const Url& url_=Url()) : url(url_) {}
+ Url url; ///< Broker address.
};
typedef std::vector<Member> MemberList;
@@ -60,11 +57,12 @@
* @param name of the cluster.
* @param url of this broker, sent to the cluster.
*/
- Cluster(const std::string& name, const std::string& url, broker::Broker&);
+ Cluster(const std::string& name, const Url& url, broker::Broker&);
virtual ~Cluster();
- framing::HandlerUpdater& getHandlerUpdater() { return sessions; }
+ // FIXME aconway 2008-01-29:
+ intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
/** Get the current cluster membership. */
MemberList getMembers() const;
@@ -83,7 +81,7 @@
sys::Duration timeout=sys::TIME_INFINITE) const;
/** Send frame to the cluster */
- void handle(framing::AMQFrame&);
+ void send(framing::AMQFrame&, framing::FrameHandler*);
private:
typedef Cpg::Id Id;
@@ -113,12 +111,12 @@
mutable sys::Monitor lock;
Cpg cpg;
Cpg::Name name;
- std::string url;
+ Url url;
Id self;
MemberMap members;
sys::Thread dispatcher;
boost::function<void()> callback;
- SessionManager sessions;
+ intrusive_ptr<broker::SessionManager::Observer> observer;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Mon Feb 11 04:11:03 2008
@@ -15,9 +15,12 @@
* limitations under the License.
*
*/
+#include <boost/program_options/value_semantic.hpp>
+
+
+
#include "qpid/broker/Broker.h"
#include "qpid/cluster/Cluster.h"
-#include "qpid/cluster/SessionManager.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/shared_ptr.h"
@@ -25,25 +28,34 @@
#include <boost/optional.hpp>
#include <boost/utility/in_place_factory.hpp>
+
namespace qpid {
namespace cluster {
using namespace std;
-struct ClusterPlugin : public Plugin {
+struct ClusterOptions : public Options {
+ string name;
+ string url;
+
+ ClusterOptions() : Options("Cluster Options") {
+ addOptions()
+ ("cluster-name", optValue(name, "NAME"), "Name of cluster to join")
+ ("cluster-url", optValue(url,"URL"),
+ "URL of this broker, advertized to the cluster.\n"
+ "Defaults to a URL listing all the local IP addresses\n");
+ }
+
+ Url getUrl(uint16_t port) const {
+ if (url.empty()) return Url::getIpAddressesUrl(port);
+ return Url(url);
+ }
+};
- struct ClusterOptions : public Options {
- string clusterName;
- ClusterOptions() : Options("Cluster Options") {
- addOptions()
- ("cluster", optValue(clusterName, "NAME"),
- "Joins the cluster named NAME");
- }
- };
+struct ClusterPlugin : public Plugin {
ClusterOptions options;
boost::optional<Cluster> cluster;
- boost::optional<SessionManager> sessions;
Options* getOptions() { return &options; }
@@ -52,10 +64,12 @@
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker, and only if the --cluster config is set.
- if (broker && !options.clusterName.empty()) {
+ if (broker && !options.name.empty()) {
assert(!cluster); // A process can only belong to one cluster.
- cluster = boost::in_place(options.clusterName, broker->getUrl(), boost::ref(*broker));
- broker->add(make_shared_ptr(&cluster->getHandlerUpdater(), nullDeleter));
+ cluster = boost::in_place(options.name,
+ options.getUrl(broker->getPort()),
+ boost::ref(*broker));
+ broker->getSessionManager().add(cluster->getObserver());
}
}
};
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.cpp Mon Feb 11 04:11:03 2008
@@ -145,13 +145,6 @@
return "Cannot mcast to CPG group "+group.str();
}
-uint32_t Cpg::getLocalNoideId() const {
- unsigned int nodeid;
- check(cpg_local_get(handle, &nodeid), "Cannot get local node ID");
- assert(nodeid <= std::numeric_limits<uint32_t>::max());
- return nodeid;
-}
-
ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
ostream_iterator<Cpg::Id> i(o, " ");
std::copy(a.first, a.first+a.second, i);
@@ -176,10 +169,6 @@
return out << string(name.value, name.length);
}
-
-Cpg::Id Cpg::Id::self(Cpg& cpg) {
- return Id(cpg.getLocalNoideId(), getpid());
-}
}} // namespace qpid::cluster
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.h Mon Feb 11 04:11:03 2008
@@ -57,12 +57,10 @@
struct Id {
uint64_t id;
- Id() : id(0) {}
+ Id(uint64_t n=0) : id(n) {}
Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; }
Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {}
- static Id self(Cpg& cpg);
-
operator uint64_t() const { return id; }
uint32_t nodeId() const { return id >> 32; }
pid_t pid() const { return id & 0xFFFF; }
@@ -132,8 +130,6 @@
cpg_handle_t getHandle() const { return handle; }
- uint32_t getLocalNoideId() const;
-
private:
class Handles;
struct ClearHandleOnExit;
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h Mon Feb 11 04:11:03 2008
@@ -24,14 +24,12 @@
#include "qpid/framing/MethodBodyDefaultVisitor.h"
#include "qpid/framing/AMQBody.h"
#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeartbeatBody.h"
namespace qpid {
namespace framing {
-
-class AMQHeaderBody;
-class AMQContentBody;
-class AMQHeartbeatBody;
-
/**
* Visitor for all concrete frame body types, combines
* AMQBodyConstVisitor and MethodBodyDefaultVisitor.
@@ -45,12 +43,12 @@
protected MethodBodyDefaultVisitor
{
virtual void defaultVisit(const AMQBody&) = 0;
+ void defaultVisit(const AMQMethodBody& method) { defaultVisit(static_cast<const AMQBody&>(method)); }
void visit(const AMQHeaderBody& b) { defaultVisit(b); }
void visit(const AMQContentBody& b) { defaultVisit(b); }
void visit(const AMQHeartbeatBody& b) { defaultVisit(b); }
void visit(const AMQMethodBody& b) { b.accept(static_cast<MethodBodyDefaultVisitor&>(*this)); }
- void defaultVisit(const AMQMethodBody& method) { defaultVisit(static_cast<const AMQBody&>(method)); }
using AMQBodyConstVisitor::visit;
using MethodBodyDefaultVisitor::visit;
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/log/Statement.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/log/Statement.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/log/Statement.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/log/Statement.cpp Mon Feb 11 04:11:03 2008
@@ -18,14 +18,42 @@
#include "Statement.h"
#include "Logger.h"
+#include <boost/bind.hpp>
#include <stdexcept>
+#include <algorithm>
#include <syslog.h>
namespace qpid {
namespace log {
+namespace {
+using namespace std;
+
+struct IsControl { bool operator()(unsigned char c) { return c < 32; } };
+
+bool isClean(const std::string& str) {
+ return std::find_if(str.begin(), str.end(), IsControl()) == str.end();
+}
+
+std::string quote(const std::string& str) {
+ IsControl isControl;
+ size_t n = std::count_if(str.begin(), str.end(), isControl);
+ std::string ret;
+ ret.reserve(str.size()+n); // Avoid extra allocations.
+ for (string::const_iterator i = str.begin(); i != str.end(); ++i) {
+ if (isControl(*i)) {
+ ret.push_back('^');
+ ret.push_back((*i)+64);
+ }
+ else ret.push_back(*i);
+ }
+ return ret;
+}
+
+}
+
void Statement::log(const std::string& message) {
- Logger::instance().log(*this,message);
+ Logger::instance().log(*this, isClean(message) ? message : quote(message));
}
Statement::Initializer::Initializer(Statement& s) : statement(s) {
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Acceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Acceptor.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Acceptor.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Acceptor.h Mon Feb 11 04:11:03 2008
@@ -38,6 +38,9 @@
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
virtual void run(ConnectionInputHandlerFactory* factory) = 0;
+ virtual void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory) = 0;
+
+ /** Note: this function is async-signal safe */
virtual void shutdown() = 0;
};
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Mon Feb 11 04:11:03 2008
@@ -30,6 +30,7 @@
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/AMQDataBlock.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
@@ -53,6 +54,7 @@
AsynchIOAcceptor(int16_t port, int backlog, int threads);
~AsynchIOAcceptor() {}
void run(ConnectionInputHandlerFactory* factory);
+ void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory);
void shutdown();
uint16_t getPort() const;
@@ -92,13 +94,17 @@
bool initiated;
bool readError;
std::string identifier;
+ bool isClient;
+
+ void write(const framing::AMQDataBlock&);
public:
AsynchIOHandler() :
inputHandler(0),
frameQueueClosed(false),
initiated(false),
- readError(false)
+ readError(false),
+ isClient(false)
{}
~AsynchIOHandler() {
@@ -107,6 +113,8 @@
delete inputHandler;
}
+ void setClient() { isClient = true; }
+
void init(AsynchIO* a, ConnectionInputHandler* h) {
aio = a;
inputHandler = h;
@@ -179,11 +187,50 @@
t[i].join();
}
}
+
+void AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f)
+{
+ Socket* socket = new Socket();//Should be deleted by handle when socket closes
+ socket->connect(host, port);
+ AsynchIOHandler* async = new AsynchIOHandler;
+ async->setClient();
+ ConnectionInputHandler* handler = f->create(async, *socket);
+ AsynchIO* aio = new AsynchIO(*socket,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+ async->init(aio, handler);
+
+ // Give connection some buffers to use
+ for (int i = 0; i < 4; i++) {
+ aio->queueReadBuffer(new Buff);
+ }
+ aio->start(poller);
+
+}
+
void AsynchIOAcceptor::shutdown() {
+ // NB: this function must be async-signal safe, it must not
+ // call any function that is not async-signal safe.
poller->shutdown();
}
+
+void AsynchIOHandler::write(const framing::AMQDataBlock& data)
+{
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ if (!buff)
+ buff = new Buff;
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ data.encode(out);
+ buff->dataCount = data.size();
+ aio->queueWrite(buff);
+}
+
// Output side
void AsynchIOHandler::send(framing::AMQFrame& frame) {
// TODO: Need to find out if we are in the callback context,
@@ -274,6 +321,12 @@
}
void AsynchIOHandler::idle(AsynchIO&){
+ if (isClient && !initiated) {
+ //get & write protocol header from upper layers
+ write(inputHandler->getInitiation());
+ initiated = true;
+ return;
+ }
ScopedLock<Mutex> l(frameQueueLock);
if (frameQueue.empty()) {
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/BlockingQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/BlockingQueue.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/BlockingQueue.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/BlockingQueue.h Mon Feb 11 04:11:03 2008
@@ -103,9 +103,13 @@
return closed;
}
- bool isEmpty() const {
+ bool empty() const {
Waitable::ScopedLock l(lock);
return queue.empty();
+ }
+ size_t size() const {
+ Waitable::ScopedLock l(lock);
+ return queue.size();
}
private:
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h Mon Feb 11 04:11:03 2008
@@ -36,6 +36,7 @@
public TimeoutHandler, public OutputTask
{
public:
+ virtual qpid::framing::ProtocolInitiation getInitiation() = 0;
virtual void closed() = 0;
};
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Poller.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Poller.h Mon Feb 11 04:11:03 2008
@@ -96,6 +96,7 @@
Poller();
~Poller();
+ /** Note: this function is async-signal safe */
void shutdown();
void addFd(PollerHandle& handle, Direction dir);
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Socket.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Socket.h Mon Feb 11 04:11:03 2008
@@ -87,7 +87,16 @@
* socket
*/
std::string getPeerAddress() const;
+ /**
+ * Returns an address (host and port) for the local end of the
+ * socket
+ */
+ std::string getLocalAddress() const;
+ uint getLocalPort() const;
+ uint getRemotePort() const;
+
+
/** Accept a connection from a socket that is already listening
* and has an incoming connection
*/
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon Feb 11 04:11:03 2008
@@ -253,6 +253,9 @@
}
void Poller::shutdown() {
+ // NB: this function must be async-signal safe, it must not
+ // call any function that is not async-signal safe.
+
// Allow sloppy code to shut us down more than once
if (impl->isShutdown)
return;
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/posix/Socket.cpp Mon Feb 11 04:11:03 2008
@@ -30,6 +30,7 @@
#include <sys/errno.h>
#include <netinet/in.h>
#include <netdb.h>
+#include <cstdlib>
#include <boost/format.hpp>
@@ -45,6 +46,7 @@
int fd;
std::string getName(bool local, bool includeService = false) const;
+ std::string getService(bool local) const;
};
std::string SocketPrivate::getName(bool local, bool includeService) const
@@ -77,6 +79,28 @@
}
}
+std::string SocketPrivate::getService(bool local) const
+{
+ ::sockaddr_storage name; // big enough for any socket address
+ ::socklen_t namelen = sizeof(name);
+
+ int result = -1;
+ if (local) {
+ result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
+ } else {
+ result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
+ }
+
+ QPID_POSIX_CHECK(result);
+
+ char servName[NI_MAXSERV];
+ if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0,
+ servName, sizeof(servName),
+ NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+ throw QPID_POSIX_ERROR(rc);
+ return servName;
+}
+
Socket::Socket() :
impl(new SocketPrivate)
{
@@ -229,6 +253,21 @@
std::string Socket::getPeerAddress() const
{
return impl->getName(false, true);
+}
+
+std::string Socket::getLocalAddress() const
+{
+ return impl->getName(true, true);
+}
+
+uint Socket::getLocalPort() const
+{
+ return atoi(impl->getService(true).c_str());
+}
+
+uint Socket::getRemotePort() const
+{
+ return atoi(impl->getService(true).c_str());
}
int Socket::toFd() const {
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpidd.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpidd.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpidd.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpidd.cpp Mon Feb 11 04:11:03 2008
@@ -120,8 +120,9 @@
shared_ptr<Broker> brokerPtr;
auto_ptr<QpiddOptions> options;
-void shutdownHandler(int signal){
- QPID_LOG(notice, "Shutting down on signal " << signal);
+void shutdownHandler(int /*signal*/){
+ // Note: do not call any async-signal unsafe functions here.
+ // Do any extra shtudown actions in main() after broker->run()
brokerPtr->shutdown();
}
@@ -155,7 +156,7 @@
void loadModuleDir (string dirname, bool isDefault)
{
- fs::path dirPath (dirname);
+ fs::path dirPath (dirname, fs::native);
if (!fs::exists (dirPath))
{
@@ -245,7 +246,8 @@
brokerPtr.reset(new Broker(options->broker));
if (options->broker.port == 0)
cout << uint16_t(brokerPtr->getPort()) << endl;
- brokerPtr->run();
+ brokerPtr->run();
+ QPID_LOG(notice, "Shutting down.");
}
return 0;
}
Propchange: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon Feb 11 04:11:03 2008
@@ -14,3 +14,5 @@
qpidd.vglog
txtest
latencytest
+ais_test
+cluster.ports
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/BrokerFixture.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/BrokerFixture.h Mon Feb 11 04:11:03 2008
@@ -43,7 +43,10 @@
BrokerFixture() {
Broker::Options opts;
opts.port=0;
+ // Management doesn't play well with multiple in-process brokers.
+ opts.enableMgmt=false;
opts.workerThreads=1;
+ opts.dataDir="";
broker = Broker::create(opts);
// TODO aconway 2007-12-05: At one point BrokerFixture
// tests could hang in Connection ctor if the following
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ClientSessionTest.cpp Mon Feb 11 04:11:03 2008
@@ -18,35 +18,42 @@
* under the License.
*
*/
-#include "qpid_test_plugin.h"
+#include "unit_test.h"
#include "BrokerFixture.h"
#include "qpid/client/Dispatcher.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
#include "qpid/client/Session_0_10.h"
#include "qpid/framing/TransferContent.h"
#include "qpid/framing/reply_exceptions.h"
#include <boost/optional.hpp>
+#include <boost/lexical_cast.hpp>
-#include <list>
+#include <vector>
+
+QPID_AUTO_TEST_SUITE(ClientSessionTest)
using namespace qpid::client;
using namespace qpid::client::arg;
using namespace qpid::framing;
using namespace qpid;
+using std::string;
+using std::cout;
+using std::endl;
using namespace boost;
-struct DummyListener : public MessageListener
-{
- std::list<Message> messages;
- std::string name;
+
+struct DummyListener : public sys::Runnable, public MessageListener {
+ std::vector<Message> messages;
+ string name;
uint expected;
- uint count;
Dispatcher dispatcher;
- DummyListener(Session_0_10& session, const std::string& _name, uint _expected) : name(_name), expected(_expected), count(0),
- dispatcher(session) {}
+ DummyListener(Session_0_10& session, const string& n, uint ex) :
+ name(n), expected(ex), dispatcher(session) {}
- void listen()
+ void run()
{
dispatcher.listen(name, this);
dispatcher.run();
@@ -55,117 +62,127 @@
void received(Message& msg)
{
messages.push_back(msg);
- if (++count == expected) {
+ if (--expected == 0)
dispatcher.stop();
- }
}
};
-class ClientSessionTest : public CppUnit::TestCase, public ProxySessionFixture
+struct ClientSessionFixture : public ProxySessionFixture
{
- CPPUNIT_TEST_SUITE(ClientSessionTest);
- CPPUNIT_TEST(testQueueQuery);
- CPPUNIT_TEST(testTransfer);
- CPPUNIT_TEST(testDispatcher);
- CPPUNIT_TEST(testResumeExpiredError);
- CPPUNIT_TEST(testUseSuspendedError);
- CPPUNIT_TEST(testSuspendResume);
- CPPUNIT_TEST_SUITE_END();
-
- public:
-
- void declareSubscribe(const std::string& q="my-queue",
- const std::string& dest="my-dest")
+ void declareSubscribe(const string& q="my-queue",
+ const string& dest="my-dest")
{
session.queueDeclare(queue=q);
session.messageSubscribe(queue=q, destination=dest, acquireMode=1);
session.messageFlow(destination=dest, unit=0, value=0xFFFFFFFF);//messages
session.messageFlow(destination=dest, unit=1, value=0xFFFFFFFF);//bytes
}
+};
- void testQueueQuery()
- {
- session =connection.newSession();
- session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true);
- TypedResult<QueueQueryResult> result = session.queueQuery(std::string("my-queue"));
- CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable());
- CPPUNIT_ASSERT_EQUAL(true, result.get().getExclusive());
- CPPUNIT_ASSERT_EQUAL(std::string("amq.fanout"),
- result.get().getAlternateExchange());
- }
+BOOST_FIXTURE_TEST_CASE(testQueueQuery, ClientSessionFixture) {
+ session =connection.newSession();
+ session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true);
+ TypedResult<QueueQueryResult> result = session.queueQuery(string("my-queue"));
+ BOOST_CHECK_EQUAL(false, result.get().getDurable());
+ BOOST_CHECK_EQUAL(true, result.get().getExclusive());
+ BOOST_CHECK_EQUAL(string("amq.fanout"),
+ result.get().getAlternateExchange());
+}
- void testTransfer()
- {
- session =connection.newSession();
- declareSubscribe();
- session.messageTransfer(content=TransferContent("my-message", "my-queue"));
- //get & test the message:
- FrameSet::shared_ptr msg = session.get();
- CPPUNIT_ASSERT(msg->isA<MessageTransferBody>());
- CPPUNIT_ASSERT_EQUAL(std::string("my-message"), msg->getContent());
- //confirm receipt:
- session.getExecution().completed(msg->getId(), true, true);
- }
+BOOST_FIXTURE_TEST_CASE(testTransfer, ClientSessionFixture)
+{
+ session=connection.newSession();
+ declareSubscribe();
+ session.messageTransfer(content=TransferContent("my-message", "my-queue"));
+ //get & test the message:
+ FrameSet::shared_ptr msg = session.get();
+ BOOST_CHECK(msg->isA<MessageTransferBody>());
+ BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
+ //confirm receipt:
+ session.getExecution().completed(msg->getId(), true, true);
+}
- void testDispatcher()
- {
- session =connection.newSession();
- declareSubscribe();
+BOOST_FIXTURE_TEST_CASE(testDispatcher, ClientSessionFixture)
+{
+ session =connection.newSession();
+ declareSubscribe();
+ size_t count = 100;
+ for (size_t i = 0; i < count; ++i)
+ session.messageTransfer(content=TransferContent(lexical_cast<string>(i), "my-queue"));
+ DummyListener listener(session, "my-dest", count);
+ listener.run();
+ BOOST_REQUIRE_EQUAL(count, listener.messages.size());
+ for (size_t i = 0; i < count; ++i)
+ BOOST_CHECK_EQUAL(lexical_cast<string>(i), listener.messages[i].getData());
+}
- TransferContent msg1("One");
- msg1.getDeliveryProperties().setRoutingKey("my-queue");
- session.messageTransfer(content=msg1);
-
- TransferContent msg2("Two");
- msg2.getDeliveryProperties().setRoutingKey("my-queue");
- session.messageTransfer(content=msg2);
-
- TransferContent msg3("Three");
- msg3.getDeliveryProperties().setRoutingKey("my-queue");
- session.messageTransfer(content=msg3);
-
- DummyListener listener(session, "my-dest", 3);
- listener.listen();
- CPPUNIT_ASSERT_EQUAL((size_t) 3, listener.messages.size());
- CPPUNIT_ASSERT_EQUAL(std::string("One"), listener.messages.front().getData());
- listener.messages.pop_front();
- CPPUNIT_ASSERT_EQUAL(std::string("Two"), listener.messages.front().getData());
- listener.messages.pop_front();
- CPPUNIT_ASSERT_EQUAL(std::string("Three"), listener.messages.front().getData());
- listener.messages.pop_front();
+/* FIXME aconway 2008-01-28: hangs
+BOOST_FIXTURE_TEST_CASE(testDispatcherThread, ClientSessionFixture)
+{
+ session =connection.newSession();
+ declareSubscribe();
+ size_t count = 10000;
+ DummyListener listener(session, "my-dest", count);
+ sys::Thread t(listener);
+ for (size_t i = 0; i < count; ++i) {
+ session.messageTransfer(content=TransferContent(lexical_cast<string>(i), "my-queue"));
+ if (i%100 == 0) cout << "T" << i << std::flush;
+ }
+ t.join();
+ BOOST_REQUIRE_EQUAL(count, listener.messages.size());
+ for (size_t i = 0; i < count; ++i)
+ BOOST_CHECK_EQUAL(lexical_cast<string>(i), listener.messages[i].getData());
+}
+*/
- }
+BOOST_FIXTURE_TEST_CASE(_FIXTURE, ClientSessionFixture)
+{
+ session =connection.newSession(0);
+ session.suspend(); // session has 0 timeout.
+ try {
+ connection.resume(session);
+ BOOST_FAIL("Expected InvalidArgumentException.");
+ } catch(const InternalErrorException&) {}
+}
- void testResumeExpiredError() {
- session =connection.newSession(0);
- session.suspend(); // session has 0 timeout.
- try {
- connection.resume(session);
- CPPUNIT_FAIL("Expected InvalidArgumentException.");
- } catch(const InternalErrorException&) {}
- }
+BOOST_FIXTURE_TEST_CASE(testUseSuspendedError, ClientSessionFixture)
+{
+ session =connection.newSession(60);
+ session.suspend();
+ try {
+ session.exchangeQuery(name="amq.fanout");
+ BOOST_FAIL("Expected session suspended exception");
+ } catch(const CommandInvalidException&) {}
+}
- void testUseSuspendedError() {
- session =connection.newSession(60);
- session.suspend();
- try {
- session.exchangeQuery(name="amq.fanout");
- CPPUNIT_FAIL("Expected session suspended exception");
- } catch(const CommandInvalidException&) {}
+BOOST_FIXTURE_TEST_CASE(testSuspendResume, ClientSessionFixture)
+{
+ session =connection.newSession(60);
+ declareSubscribe();
+ session.suspend();
+ // Make sure we are still subscribed after resume.
+ connection.resume(session);
+ session.messageTransfer(content=TransferContent("my-message", "my-queue"));
+ FrameSet::shared_ptr msg = session.get();
+ BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
+}
+
+BOOST_FIXTURE_TEST_CASE(testSendToSelf, SessionFixture) {
+ // https://bugzilla.redhat.com/show_bug.cgi?id=410551
+ // Deadlock if SubscriptionManager run() concurrent with session ack.
+ LocalQueue myq;
+ session.queueDeclare(queue="myq", exclusive=true, autoDelete=true);
+ subs.subscribe(myq, "myq");
+ string data("msg");
+ Message msg(data, "myq");
+ const int count=100; // Verified with count=100000 in a loop.
+ for (int i = 0; i < count; ++i)
+ session.messageTransfer(content=msg);
+ for (int j = 0; j < count; ++j) {
+ Message m=myq.pop();
+ BOOST_CHECK_EQUAL(m.getData(), data);
}
+}
- void testSuspendResume() {
- session =connection.newSession(60);
- declareSubscribe();
- session.suspend();
- // Make sure we are still subscribed after resume.
- connection.resume(session);
- session.messageTransfer(content=TransferContent("my-message", "my-queue"));
- FrameSet::shared_ptr msg = session.get();
- CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent());
- }
-};
+QPID_AUTO_TEST_SUITE_END()
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(ClientSessionTest);
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Cpg.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Cpg.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Cpg.cpp Mon Feb 11 04:11:03 2008
@@ -78,12 +78,16 @@
cpg_handle_t /*handle*/,
struct cpg_name *grp,
struct cpg_address */*members*/, int nMembers,
- struct cpg_address */*left*/, int /*nLeft*/,
- struct cpg_address */*joined*/, int /*nJoined*/
+ struct cpg_address */*left*/, int nLeft,
+ struct cpg_address */*joined*/, int nJoined
)
{
BOOST_CHECK_EQUAL(group, Cpg::str(*grp));
configChanges.push_back(nMembers);
+ BOOST_MESSAGE("configChange: "<<
+ nLeft<<" left "<<
+ nJoined<<" joined "<<
+ nMembers<<" members.");
}
};
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/IList.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/IList.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/IList.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/IList.cpp Mon Feb 11 04:11:03 2008
@@ -152,6 +152,10 @@
}
+BOOST_AUTO_TEST_CASE(testEmptyDtor) {
+ TestList l;
+}
+
BOOST_FIXTURE_TEST_CASE(testOwnership, Fixture) {
{
TestList l2;
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Makefile.am?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Makefile.am Mon Feb 11 04:11:03 2008
@@ -36,7 +36,8 @@
Url.cpp Uuid.cpp \
Shlib.cpp FieldValue.cpp FieldTable.cpp Array.cpp \
InlineVector.cpp \
- IList.cpp
+ IList.cpp \
+ ClientSessionTest.cpp
check_LTLIBRARIES += libshlibtest.la
libshlibtest_la_LDFLAGS = -module -rpath $(abs_builddir)
@@ -78,8 +79,7 @@
TxAckTest \
TxBufferTest \
TxPublishTest \
- MessageBuilderTest \
- ClientSessionTest
+ MessageBuilderTest
#client_unit_tests = \
# ClientChannelTest
@@ -109,7 +109,7 @@
check_PROGRAMS += $(testprogs) interop_runner
-TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test
+TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= $(srcdir)/run_test
system_tests = client_test quick_perftest quick_topictest
TESTS += run-unit-tests start_broker $(system_tests) python_tests stop_broker
@@ -123,7 +123,6 @@
.valgrind.supp \
.valgrindrc \
MessageUtils.h \
- MockChannel.h \
MockConnectionInputHandler.h \
TxMocks.h \
qpid_test_plugin.h
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp Mon Feb 11 04:11:03 2008
@@ -22,7 +22,6 @@
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/FieldValue.h"
-#include "MockChannel.h"
#include "qpid_test_plugin.h"
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp Mon Feb 11 04:11:03 2008
@@ -25,7 +25,6 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid_test_plugin.h"
#include <iostream>
-#include "MockChannel.h"
#include "boost/format.hpp"
using namespace qpid;
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TestOptions.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TestOptions.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TestOptions.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TestOptions.h Mon Feb 11 04:11:03 2008
@@ -37,8 +37,8 @@
TestOptions(const std::string& helpText_=std::string()) :
Options("Test Options"),
host("localhost"), port(TcpAddress::DEFAULT_PORT),
- clientid("cpp"), help(false),
- helpText(helpText_)
+ clientid("cpp"), username("guest"), password("guest"),
+ help(false), helpText(helpText_)
{
addOptions()
("host,h", optValue(host, "HOST"), "Broker host to connect to")
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp Mon Feb 11 04:11:03 2008
@@ -25,7 +25,6 @@
#include <iostream>
#include <list>
#include <vector>
-#include "MockChannel.h"
using std::list;
using std::vector;
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxPublishTest.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxPublishTest.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxPublishTest.cpp Mon Feb 11 04:11:03 2008
@@ -25,7 +25,6 @@
#include <iostream>
#include <list>
#include <vector>
-#include "MockChannel.h"
#include "MessageUtils.h"
using std::list;
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Url.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Url.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Url.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Url.cpp Mon Feb 11 04:11:03 2008
@@ -33,7 +33,7 @@
url.push_back(TcpAddress("foo.com"));
url.push_back(TcpAddress("bar.com", 6789));
BOOST_CHECK_EQUAL("amqp:tcp:foo.com:5672,tcp:bar.com:6789", url.str());
- BOOST_CHECK_EQUAL("amqp:", Url().str());
+ BOOST_CHECK(Url().str().empty());
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ais_check
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ais_check?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ais_check (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ais_check Mon Feb 11 04:11:03 2008
@@ -2,10 +2,12 @@
# Check for requirements, run AIS tests if found.
#
-test `id -ng` = "ais" || BADGROUP=yes
-{ ps -u root | grep aisexec >/dev/null; } || NOAISEXEC=yes
+id -nG | grep '\<ais\>' || \
+ NOGROUP="You are not a member of the ais group."
+ps -u root | grep aisexec >/dev/null || \
+ NOAISEXEC="The aisexec daemon is not running as root"
-if test -n "$BADGROUP" -o -n "$NOAISEXEC"; then
+if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then
cat <<EOF
=========== WARNING: NOT RUNNING AIS TESTS ==============
@@ -13,18 +15,8 @@
Tests that depend on the openais library (used for clustering)
will not be run because:
-EOF
- test -n "$BADGROUP" && cat <<EOF
- You do not appear to have you group ID set to "ais". Make ais your
- primary group, or run "newgrp ais" before running the tests.
-
-EOF
- test -n "$NOAISEXEC" && cat <<EOF
- The aisexec daemon is not running. Make sure /etc/ais/openais.conf
- is a valid configuration and aisexec is run by root.
-EOF
-
- cat <<EOF
+ $NOGROUP
+ $NOAISEXEC
==========================================================
@@ -32,8 +24,4 @@
exit 0; # A warning, not a failure.
fi
-FAILED=0
-for test in `cat ais_tests`; do
- ./$test || FAILED=`expr $FAILED + 1`
-done
-exit $FAILED
+echo ./ais_run | newgrp ais
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster.mk?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster.mk Mon Feb 11 04:11:03 2008
@@ -1,55 +1,20 @@
-# FIXME aconway 2007-08-31: Disabled cluster compilation,
-# has not been kept up to date with recent commits.
+if CPG
+#
+# Cluster tests makefile fragment, to be included in Makefile.am
#
-# if CLUSTER
-# # Cluster tests makefile fragment, to be included in Makefile.am
-# #
+lib_cluster = $(abs_builddir)/../libqpidcluster.la
-# lib_cluster = $(abs_builddir)/../libqpidcluster.la
-
-# # NOTE: Programs using the openais library must be run with gid=ais
-# # You should do "newgrp ais" before running the tests to run these.
-# #
-
-# #
-# # Cluster tests.
-# #
-
-# # ais_check runs ais if the conditions to run AIS tests
-# # are met, otherwise it prints a warning.
-# TESTS+=ais_check
-# EXTRA_DIST+=ais_check
-# AIS_TESTS=
-
-# ais_check: ais_tests
-# ais_tests:
-# echo $(AIS_TESTS)
-# echo "# AIS tests" >$@
-# for t in $(AIS_TESTS); do echo ./$$t >$@; done
-# chmod a+x $@
-
-# CLEANFILES+=ais_tests
-
-# AIS_TESTS+=Cpg
-# check_PROGRAMS+=Cpg
-# Cpg_SOURCES=Cpg.cpp
-# Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework
-
-# # TODO aconway 2007-07-26: Fix this test.
-# #AIS_TESTS+=Cluster
-# # check_PROGRAMS+=Cluster
-# # Cluster_SOURCES=Cluster.cpp Cluster.h
-# # Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework
-
-# check_PROGRAMS+=Cluster_child
-# Cluster_child_SOURCES=Cluster_child.cpp Cluster.h
-# Cluster_child_LDADD=$(lib_cluster) -lboost_test_exec_monitor
+# NOTE: Programs using the openais library must be run with gid=ais
+# You should do "newgrp ais" before running the tests to run these.
+#
-# # TODO aconway 2007-07-03: In progress
-# #AIS_TESTS+=cluster_client
-# check_PROGRAMS+=cluster_client
-# cluster_client_SOURCES=cluster_client.cpp
-# cluster_client_LDADD=$(lib_client) -lboost_unit_test_framework
+# ais_check checks conditions for AIS tests and runs if ok.
+TESTS+=ais_check
+EXTRA_DIST+=ais_check ais_run
+
+check_PROGRAMS+=ais_test
+ais_test_SOURCES=ais_test.cpp Cpg.cpp
+ais_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
-# endif
+endif
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster_client.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster_client.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster_client.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster_client.cpp Mon Feb 11 04:11:03 2008
@@ -16,21 +16,25 @@
*
*/
-#include "qpid/client/Connection.h"
-#include "qpid/shared_ptr.h"
-
#include "unit_test.h"
+#include "BrokerFixture.h"
+#include "qpid/client/Session.h"
#include <fstream>
#include <vector>
#include <functional>
-
QPID_AUTO_TEST_SUITE(cluster_clientTestSuite)
-using namespace std;
using namespace qpid;
using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::client::arg;
+using framing::TransferContent;
+using std::vector;
+using std::string;
+using std::ifstream;
+using std::ws;
struct ClusterConnections : public vector<shared_ptr<Connection> > {
ClusterConnections() {
@@ -58,25 +62,23 @@
ClusterConnections cluster;
BOOST_REQUIRE(cluster.size() > 1);
- Exchange fooEx("FooEx", Exchange::TOPIC_EXCHANGE);
- Queue fooQ("FooQ");
-
- Channel broker0;
- cluster[0]->openChannel(broker0);
- broker0.declareExchange(fooEx);
- broker0.declareQueue(fooQ);
- broker0.bind(fooEx, fooQ, "FooKey");
+ Session broker0 = cluster[0]->newSession();
+ broker0.exchangeDeclare(exchange="ex");
+ broker0.queueDeclare(queue="q");
+ broker0.queueBind(exchange="ex", queue="q", routingKey="key");
broker0.close();
for (size_t i = 1; i < cluster.size(); ++i) {
- Channel ch;
- cluster[i]->openChannel(ch);
- ch.publish(Message("hello"), fooEx, "FooKey");
- Message m;
- BOOST_REQUIRE(ch.get(m, fooQ));
- BOOST_REQUIRE_EQUAL(m.getData(), "hello");
- ch.close();
- }
+ Session s = cluster[i]->newSession();
+ s.messageTransfer(content=TransferContent("data", "key", "ex"));
+ s.messageSubscribe(queue="q", destination="q");
+ s.messageFlow(destination="q", unit=0, value=1);//messages
+ FrameSet::shared_ptr msg = s.get();
+ BOOST_CHECK(msg->isA<MessageTransferBody>());
+ BOOST_CHECK_EQUAL(string("data"), msg->getContent());
+ s.getExecution().completed(msg->getId(), true, true);
+ cluster[i]->close();
+ }
}
QPID_AUTO_TEST_SUITE_END()
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/logging.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/logging.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/logging.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/logging.cpp Mon Feb 11 04:11:03 2008
@@ -367,4 +367,24 @@
unlink("logging.tmp");
}
+BOOST_AUTO_TEST_CASE(testQuoteControlChars) {
+ Logger& l=Logger::instance();
+ l.clear();
+ Options opts;
+ opts.outputs.clear();
+ opts.outputs.push_back("logging.tmp");
+ opts.time=false;
+ l.configure(opts, "test");
+ char s[] = "null\0tab\tspace newline\nret\r";
+ string str(s, sizeof(s));
+ QPID_LOG(critical, str);
+ ifstream log("logging.tmp");
+ string line;
+ getline(log, line);
+ string expect="critical null^@tab^Ispace newline^Jret^M^@";
+ BOOST_CHECK_EQUAL(expect, line);
+ log.close();
+ unlink("logging.tmp");
+}
+
QPID_AUTO_TEST_SUITE_END()
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp Mon Feb 11 04:11:03 2008
@@ -38,6 +38,8 @@
#include <sstream>
#include <numeric>
#include <algorithm>
+#include <unistd.h>
+
using namespace std;
using namespace qpid;
@@ -90,6 +92,8 @@
size_t iterations;
Mode mode;
bool summary;
+ uint32_t intervalSub;
+ uint32_t intervalPub;
static const std::string helpText;
@@ -98,7 +102,8 @@
setup(false), control(false), publish(false), subscribe(false),
pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false),
subs(1), ack(0),
- qt(1), iterations(1), mode(SHARED), summary(false)
+ qt(1), iterations(1), mode(SHARED), summary(false),
+ intervalSub(0), intervalPub(0)
{
addOptions()
("setup", optValue(setup), "Create shared queues.")
@@ -127,7 +132,10 @@
("summary,s", optValue(summary), "Summary output: pubs/sec subs/sec transfers/sec Mbytes/sec")
("queue_max_count", optValue(queueMaxCount, "N"), "queue policy: count to trigger 'flow to disk'")
- ("queue_max_size", optValue(queueMaxSize, "N"), "queue policy: accumulated size to trigger 'flow to disk'");
+ ("queue_max_size", optValue(queueMaxSize, "N"), "queue policy: accumulated size to trigger 'flow to disk'")
+
+ ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume")
+ ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish");
}
// Computed values
@@ -454,6 +462,7 @@
arg::destination=destination,
arg::content=msg,
arg::confirmMode=opts.confirm);
+ if (opts.intervalPub) ::usleep(opts.intervalPub*1000);
}
if (opts.confirm) completion.sync();
AbsTime end=now();
@@ -523,6 +532,7 @@
size_t expect=0;
for (size_t i = 0; i < opts.subQuota; ++i) {
msg=lq.pop();
+ if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
// TODO aconway 2007-11-23: check message order for.
// multiple publishers. Need an acorray of counters,
// one per publisher and a publisher ID in the
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/start_cluster
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/start_cluster?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/start_cluster (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/start_cluster Mon Feb 11 04:11:03 2008
@@ -12,7 +12,7 @@
OPTS=$*
CLUSTER=`whoami` # Cluster name=user name, avoid clashes.
for (( i=0; i<SIZE; ++i )); do
- PORT=`../qpidd -dp0 --log-output=cluster$i.log --cluster $CLUSTER $OPTS` || exit 1
+ PORT=`../qpidd --load-module ../.libs/libqpidcluster.so -dp0 --log-output=cluster$i.log --cluster-name $CLUSTER $OPTS` || exit 1
echo $PORT >> cluster.ports
done
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-passwd
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-passwd?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-passwd (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-passwd Mon Feb 11 04:11:03 2008
@@ -1,30 +1,30 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Set classpath to include Qpid jar with all required jars in manifest
-QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar
-
-# Set other variables used by the qpid-run script before calling
-export JAVA=java \
- JAVA_VM=-server \
- JAVA_MEM=-Xmx1024m \
- QPID_CLASSPATH=$QPID_LIBS
-
-. qpid-run org.apache.qpid.tools.security.Passwd "$@"
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java \
+ JAVA_VM=-server \
+ JAVA_MEM=-Xmx1024m \
+ QPID_CLASSPATH=$QPID_LIBS
+
+. qpid-run org.apache.qpid.tools.security.Passwd "$@"
Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-passwd
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-passwd
------------------------------------------------------------------------------
svn:executable = *
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-server
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-server?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-server (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-server Mon Feb 11 04:11:03 2008
@@ -1,31 +1,31 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Set classpath to include Qpid jar with all required jars in manifest
-QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar:$QPID_HOME/lib/bdbstore-launch.jar
-
-# Set other variables used by the qpid-run script before calling
-export JAVA=java \
- JAVA_VM=-server \
- JAVA_MEM=-Xmx1024m \
- JAVA_GC="-XX:-UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError" \
- QPID_CLASSPATH=$QPID_LIBS
-
-. qpid-run org.apache.qpid.server.Main "$@"
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar:$QPID_HOME/lib/bdbstore-launch.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java \
+ JAVA_VM=-server \
+ JAVA_MEM=-Xmx1024m \
+ JAVA_GC="-XX:-UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError" \
+ QPID_CLASSPATH=$QPID_LIBS
+
+. qpid-run org.apache.qpid.server.Main "$@"
Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-server
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-server
------------------------------------------------------------------------------
svn:executable = *