You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/02/13 16:40:01 UTC
svn commit: r627484 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: Url.cpp
Url.h broker/Broker.cpp broker/Broker.h framing/AMQBody.cpp
framing/AMQBody.h sys/Acceptor.h sys/AsynchIOAcceptor.cpp
Author: aconway
Date: Wed Feb 13 07:39:59 2008
New Revision: 627484
URL: http://svn.apache.org/viewvc?rev=627484&view=rev
Log:
Broker::connect - connect to URL, return ConnectionInputHandler.
M src/qpid/broker/Broker.cpp
M src/qpid/broker/Broker.h
M src/qpid/sys/Acceptor.h
M src/qpid/sys/AsynchIOAcceptor.cpp
AMQBody::match - test for matching frames.
M src/qpid/framing/AMQBody.cpp
M src/qpid/framing/AMQBody.h
Url::throwIfEmpty() - test for empty URL.
M src/qpid/Url.cpp
M src/qpid/Url.h
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp Wed Feb 13 07:39:59 2008
@@ -160,6 +160,10 @@
clear();
}
+void Url::throwIfEmpty() const {
+ throw InvalidUrl("URL contains no addresses");
+}
+
std::istream& operator>>(std::istream& is, Url& url) {
std::string s;
is >> s;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h Wed Feb 13 07:39:59 2008
@@ -78,6 +78,9 @@
template<class T> Url& operator=(T s) { parse(s); return *this; }
+ /** Throw InvalidUrl if the URL does not contain any addresses. */
+ void throwIfEmpty() const;
+
/** Replace contents with parsed URL as defined in
* https://wiki.108.redhat.com/jira/browse/AMQP-95
*@exception InvalidUrl if the url is invalid.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Feb 13 07:39:59 2008
@@ -40,6 +40,7 @@
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/sys/TimeoutHandler.h"
#include "qpid/sys/SystemInfo.h"
+#include "qpid/Url.h"
#include <boost/bind.hpp>
@@ -263,11 +264,13 @@
case management::Broker::METHOD_ECHO :
status = Manageable::STATUS_OK;
break;
- case management::Broker::METHOD_CONNECT :
- connect(dynamic_cast<management::ArgsBrokerConnect&>(args));
+ case management::Broker::METHOD_CONNECT : {
+ management::ArgsBrokerConnect& hp=
+ dynamic_cast<management::ArgsBrokerConnect&>(args);
+ connect(hp.i_host, hp.i_port);
status = Manageable::STATUS_OK;
break;
-
+ }
case management::Broker::METHOD_JOINCLUSTER :
case management::Broker::METHOD_LEAVECLUSTER :
status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -277,9 +280,19 @@
return status;
}
-void Broker::connect(management::ArgsBrokerConnect& args)
+sys::ConnectionInputHandler* Broker::connect(
+ const std::string& host, uint16_t port,
+ sys::ConnectionInputHandlerFactory* f)
+{
+ return getAcceptor().connect(host, port, f ? f : &factory);
+}
+
+sys::ConnectionInputHandler* Broker::connect(
+ const Url& url, sys::ConnectionInputHandlerFactory* f)
{
- getAcceptor().connect(args.i_host, args.i_port, &factory);
+ url.throwIfEmpty();
+ TcpAddress addr=boost::get<TcpAddress>(url[0]);
+ return connect(addr.host, addr.port, f);
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Feb 13 07:39:59 2008
@@ -47,6 +47,9 @@
#include <vector>
namespace qpid {
+
+class Url;
+
namespace broker {
static const uint16_t DEFAULT_PORT=5672;
@@ -54,7 +57,8 @@
/**
* A broker instance.
*/
-class Broker : public sys::Runnable, public Plugin::Target, public management::Manageable
+class Broker : public sys::Runnable, public Plugin::Target,
+ public management::Manageable
{
public:
@@ -111,6 +115,14 @@
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
+ /** Create a connection to another broker. */
+ sys::ConnectionInputHandler*
+ connect(const std::string& host, uint16_t port,
+ sys::ConnectionInputHandlerFactory* =0);
+ /** Create a connection to another broker. */
+ sys::ConnectionInputHandler*
+ connect(const Url& url, sys::ConnectionInputHandlerFactory* =0);
+
private:
sys::Acceptor& getAcceptor() const;
@@ -129,7 +141,6 @@
Vhost::shared_ptr vhostObject;
void declareStandardExchange(const std::string& name, const std::string& type);
- void connect(management::ArgsBrokerConnect& args);
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.cpp?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.cpp Wed Feb 13 07:39:59 2008
@@ -19,15 +19,46 @@
*
*/
-#include "AMQBody.h"
+#include "qpid/framing/AMQBody.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeartbeatBody.h"
#include <iostream>
-std::ostream& qpid::framing::operator<<(std::ostream& out, const qpid::framing::AMQBody& body)
+namespace qpid {
+namespace framing {
+
+std::ostream& operator<<(std::ostream& out, const AMQBody& body)
{
body.print(out);
return out;
}
-qpid::framing::AMQBody::~AMQBody() {}
+AMQBody::~AMQBody() {}
+
+namespace {
+struct MatchBodies : public AMQBodyConstVisitor {
+ const AMQBody& body;
+ bool match;
+
+ MatchBodies(const AMQBody& b) : body(b), match(false) {}
+ virtual ~MatchBodies() {}
+
+ virtual void visit(const AMQHeaderBody&) { match=dynamic_cast<const AMQHeaderBody*>(&body); }
+ virtual void visit(const AMQContentBody&) { match=dynamic_cast<const AMQContentBody*>(&body); }
+ virtual void visit(const AMQHeartbeatBody&) { match=dynamic_cast<const AMQHeartbeatBody*>(&body); }
+ virtual void visit(const AMQMethodBody& x) {
+ const AMQMethodBody* y=dynamic_cast<const AMQMethodBody*>(&body);
+ match = (y && y->amqpMethodId() == x.amqpMethodId() && y->amqpClassId() == x.amqpClassId());
+ }
+};
+}
+bool AMQBody::match(const AMQBody& a, const AMQBody& b) {
+ MatchBodies matcher(a);
+ b.accept(matcher);
+ return matcher.match;
+}
+}} // namespace
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.h?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.h Wed Feb 13 07:39:59 2008
@@ -59,6 +59,9 @@
virtual AMQMethodBody* getMethod() { return 0; }
virtual const AMQMethodBody* getMethod() const { return 0; }
+
+ /** Match if same type and same class/method ID for methods */
+ static bool match(const AMQBody& , const AMQBody& );
};
std::ostream& operator<<(std::ostream& out, const AMQBody& body) ;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h Wed Feb 13 07:39:59 2008
@@ -29,6 +29,7 @@
namespace sys {
class ConnectionInputHandlerFactory;
+class ConnectionInputHandler;
class Acceptor : public qpid::SharedObject<Acceptor>
{
@@ -38,7 +39,9 @@
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
virtual void run(ConnectionInputHandlerFactory* factory) = 0;
- virtual void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory) = 0;
+ virtual ConnectionInputHandler* connect(
+ const std::string& host, int16_t port,
+ ConnectionInputHandlerFactory* factory) = 0;
/** Note: this function is async-signal safe */
virtual void shutdown() = 0;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Wed Feb 13 07:39:59 2008
@@ -54,7 +54,10 @@
AsynchIOAcceptor(int16_t port, int backlog, int threads);
~AsynchIOAcceptor() {}
void run(ConnectionInputHandlerFactory* factory);
- void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory);
+ ConnectionInputHandler* connect(
+ const std::string& host, int16_t port,
+ ConnectionInputHandlerFactory* factory);
+
void shutdown();
uint16_t getPort() const;
@@ -188,7 +191,7 @@
}
}
-void AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f)
+ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f)
{
Socket* socket = new Socket();//Should be deleted by handle when socket closes
socket->connect(host, port);
@@ -209,7 +212,7 @@
aio->queueReadBuffer(new Buff);
}
aio->start(poller);
-
+ return handler;
}