You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/02/13 16:40:01 UTC

svn commit: r627484 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: Url.cpp Url.h broker/Broker.cpp broker/Broker.h framing/AMQBody.cpp framing/AMQBody.h sys/Acceptor.h sys/AsynchIOAcceptor.cpp

Author: aconway
Date: Wed Feb 13 07:39:59 2008
New Revision: 627484

URL: http://svn.apache.org/viewvc?rev=627484&view=rev
Log:
Broker::connect - connect to URL, return ConnectionInputHandler.
M      src/qpid/broker/Broker.cpp
M      src/qpid/broker/Broker.h
M      src/qpid/sys/Acceptor.h
M      src/qpid/sys/AsynchIOAcceptor.cpp

AMQBody::match - test for matching frames.
M      src/qpid/framing/AMQBody.cpp
M      src/qpid/framing/AMQBody.h

Url::throwIfEmpty() - test for empty URL.
M      src/qpid/Url.cpp
M      src/qpid/Url.h

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp Wed Feb 13 07:39:59 2008
@@ -160,6 +160,10 @@
         clear();
 }
 
+void Url::throwIfEmpty() const {
+    throw InvalidUrl("URL contains no addresses");
+}
+
 std::istream& operator>>(std::istream& is, Url& url) {
     std::string s;
     is >> s;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h Wed Feb 13 07:39:59 2008
@@ -78,6 +78,9 @@
 
     template<class T> Url& operator=(T s) { parse(s); return *this; }
     
+    /** Throw InvalidUrl if the URL does not contain any addresses. */
+    void throwIfEmpty() const;
+
     /** Replace contents with parsed URL as defined in
      * https://wiki.108.redhat.com/jira/browse/AMQP-95
      *@exception InvalidUrl if the url is invalid.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Feb 13 07:39:59 2008
@@ -40,6 +40,7 @@
 #include "qpid/sys/ConnectionInputHandlerFactory.h"
 #include "qpid/sys/TimeoutHandler.h"
 #include "qpid/sys/SystemInfo.h"
+#include "qpid/Url.h"
 
 #include <boost/bind.hpp>
 
@@ -263,11 +264,13 @@
     case management::Broker::METHOD_ECHO :
         status = Manageable::STATUS_OK;
         break;
-    case management::Broker::METHOD_CONNECT :
-        connect(dynamic_cast<management::ArgsBrokerConnect&>(args));
+      case management::Broker::METHOD_CONNECT : {
+        management::ArgsBrokerConnect& hp=
+            dynamic_cast<management::ArgsBrokerConnect&>(args);
+        connect(hp.i_host, hp.i_port);
         status = Manageable::STATUS_OK;
         break;
-
+      }
     case management::Broker::METHOD_JOINCLUSTER :
     case management::Broker::METHOD_LEAVECLUSTER :
         status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -277,9 +280,19 @@
     return status;
 }
 
-void Broker::connect(management::ArgsBrokerConnect& args)
+sys::ConnectionInputHandler* Broker::connect(
+    const std::string& host, uint16_t port,
+    sys::ConnectionInputHandlerFactory* f)
+{
+    return getAcceptor().connect(host, port, f ? f : &factory);
+}
+
+sys::ConnectionInputHandler* Broker::connect(
+    const Url& url, sys::ConnectionInputHandlerFactory* f)
 {
-    getAcceptor().connect(args.i_host, args.i_port, &factory);
+    url.throwIfEmpty();
+    TcpAddress addr=boost::get<TcpAddress>(url[0]);
+    return connect(addr.host, addr.port, f);
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Feb 13 07:39:59 2008
@@ -47,6 +47,9 @@
 #include <vector>
 
 namespace qpid { 
+
+class Url;
+
 namespace broker {
 
 static const  uint16_t DEFAULT_PORT=5672;
@@ -54,7 +57,8 @@
 /**
  * A broker instance. 
  */
-class Broker : public sys::Runnable, public Plugin::Target, public management::Manageable
+class Broker : public sys::Runnable, public Plugin::Target,
+               public management::Manageable
 {
   public:
 
@@ -111,6 +115,14 @@
     management::Manageable::status_t
         ManagementMethod (uint32_t methodId, management::Args& args);
     
+    /** Create a connection to another broker. */
+    sys::ConnectionInputHandler*
+    connect(const std::string& host, uint16_t port,
+            sys::ConnectionInputHandlerFactory* =0);
+    /** Create a connection to another broker. */
+    sys::ConnectionInputHandler*
+    connect(const Url& url, sys::ConnectionInputHandlerFactory* =0);
+
   private:
     sys::Acceptor& getAcceptor() const;
 
@@ -129,7 +141,6 @@
     Vhost::shared_ptr              vhostObject;
 
     void declareStandardExchange(const std::string& name, const std::string& type);
-    void connect(management::ArgsBrokerConnect& args);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.cpp?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.cpp Wed Feb 13 07:39:59 2008
@@ -19,15 +19,46 @@
  *
  */
 
-#include "AMQBody.h"
+#include "qpid/framing/AMQBody.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeartbeatBody.h"
 #include <iostream>
 
-std::ostream& qpid::framing::operator<<(std::ostream& out, const qpid::framing::AMQBody& body) 
+namespace qpid {
+namespace framing {
+
+std::ostream& operator<<(std::ostream& out, const AMQBody& body) 
 {
     body.print(out);
     return out;
 }
 
-qpid::framing::AMQBody::~AMQBody() {}
+AMQBody::~AMQBody() {}
+
+namespace {
+struct MatchBodies : public AMQBodyConstVisitor {
+    const AMQBody& body;
+    bool match;
+
+    MatchBodies(const AMQBody& b) :  body(b), match(false) {}
+    virtual ~MatchBodies() {}
+    
+    virtual void visit(const AMQHeaderBody&) { match=dynamic_cast<const AMQHeaderBody*>(&body); }
+    virtual void visit(const AMQContentBody&) { match=dynamic_cast<const AMQContentBody*>(&body); }
+    virtual void visit(const AMQHeartbeatBody&) { match=dynamic_cast<const AMQHeartbeatBody*>(&body); }
+    virtual void visit(const AMQMethodBody& x)  {
+        const AMQMethodBody* y=dynamic_cast<const AMQMethodBody*>(&body);
+        match = (y && y->amqpMethodId() == x.amqpMethodId() && y->amqpClassId() == x.amqpClassId());
+    }
+};
 
+}
+bool AMQBody::match(const AMQBody& a, const AMQBody& b) {
+    MatchBodies matcher(a);
+    b.accept(matcher);
+    return matcher.match;
+}
 
+}} // namespace 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.h?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQBody.h Wed Feb 13 07:39:59 2008
@@ -59,6 +59,9 @@
 
     virtual AMQMethodBody* getMethod() { return 0; }
     virtual const AMQMethodBody* getMethod() const { return 0; }
+
+    /** Match if same type and same class/method ID for methods */
+    static bool match(const AMQBody& , const AMQBody& );
 };
 
 std::ostream& operator<<(std::ostream& out, const AMQBody& body) ;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h Wed Feb 13 07:39:59 2008
@@ -29,6 +29,7 @@
 namespace sys {
 
 class ConnectionInputHandlerFactory;
+class ConnectionInputHandler;
 
 class Acceptor : public qpid::SharedObject<Acceptor>
 {
@@ -38,7 +39,9 @@
     virtual uint16_t getPort() const = 0;
     virtual std::string getHost() const = 0;
     virtual void run(ConnectionInputHandlerFactory* factory) = 0;
-    virtual void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory) = 0;
+    virtual ConnectionInputHandler* connect(
+        const std::string& host, int16_t port,
+        ConnectionInputHandlerFactory* factory) = 0;
 
     /** Note: this function is async-signal safe */
     virtual void shutdown() = 0;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=627484&r1=627483&r2=627484&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Wed Feb 13 07:39:59 2008
@@ -54,7 +54,10 @@
     AsynchIOAcceptor(int16_t port, int backlog, int threads);
     ~AsynchIOAcceptor() {}
     void run(ConnectionInputHandlerFactory* factory);
-    void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory);
+    ConnectionInputHandler* connect(
+        const std::string& host, int16_t port,
+        ConnectionInputHandlerFactory* factory);
+
     void shutdown();
         
     uint16_t getPort() const;
@@ -188,7 +191,7 @@
     }
 }
     
-void AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f)
+ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f)
 {
     Socket* socket = new Socket();//Should be deleted by handle when socket closes
     socket->connect(host, port);
@@ -209,7 +212,7 @@
         aio->queueReadBuffer(new Buff);
     }
     aio->start(poller);
-
+    return handler;
 }