You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/01/27 23:20:37 UTC

svn commit: r903864 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp Connection.cpp Connection.h

Author: aconway
Date: Wed Jan 27 22:20:36 2010
New Revision: 903864

URL: http://svn.apache.org/viewvc?rev=903864&view=rev
Log:
In clustered broker: move construction of broker::Connections to the cluster dispatch thread.

Constructing a connection can involve sending management information so needs to be
in the cluster dispatch context.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=903864&r1=903863&r2=903864&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jan 27 22:20:36 2010
@@ -294,14 +294,8 @@
 
 // Called in connection thread to insert a client connection.
 void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
-    QPID_LOG(info, *this << " new local connection " << c->getId());
-    localConnections.insert(c);
     assert(c->getId().getMember() == self);
-    // Announce the connection to the cluster.
-    if (c->isLocalClient())
-        mcast.mcastControl(ClusterConnectionAnnounceBody(ProtocolVersion(),
-                                                         c->getBrokerConnection().getSSF() ),
-                           c->getId());
+    localConnections.insert(c);
 }
 
 // Called in connection thread to insert an updated shadow connection.
@@ -497,22 +491,18 @@
     if (i != connections.end()) return i->second;
     ConnectionPtr cp;
     // If the frame is an announcement for a new connection, add it.
-    if (e.frame.getBody() && e.frame.getMethod() &&
-        e.frame.getMethod()->isA<ClusterConnectionAnnounceBody>())
+    const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody());
+    if (e.frame.getBody() && e.frame.getMethod() && announce)
     {
         if (id.getMember() == self)  { // Announces one of my own
             cp = localConnections.getErase(id);
-            assert(cp); 
+            assert(cp);
         }
         else {              // New remote connection, create a shadow.
             std::ostringstream mgmtId;
-            unsigned int ssf;
-            const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody());
-
+            unsigned int ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0;
             mgmtId << id;
-            ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0;
-            QPID_LOG(debug, *this << "new connection's ssf =" << ssf );
-            cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf );
+            cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf);
         }
         connections.insert(ConnectionMap::value_type(id, cp));
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=903864&r1=903863&r2=903864&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jan 27 22:20:36 2010
@@ -37,6 +37,7 @@
 #include "qpid/framing/AllInvoker.h"
 #include "qpid/framing/DeliveryProperties.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/framing/ClusterConnectionAnnounceBody.h"
 #include "qpid/framing/ConnectionCloseBody.h"
 #include "qpid/framing/ConnectionCloseOkBody.h"
 #include "qpid/log/Statement.h"
@@ -73,42 +74,63 @@
 
 
 // Shadow connection
-    Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId,
-                           const ConnectionId& id, unsigned int ssf)
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId,
+                       const ConnectionId& id, unsigned int ssf)
     : cluster(c), self(id), catchUp(false), output(*this, out),
-      connection(&output, cluster.getBroker(), shadowPrefix+logId, ssf), expectProtocolHeader(false),
+      connectionCtor(&output, cluster.getBroker(), shadowPrefix+logId, ssf),
+      expectProtocolHeader(false),
       mcastFrameHandler(cluster.getMulticast(), self),
       consumerNumbering(c.getUpdateReceiver().consumerNumbering)
-{ init(); }
+{}
 
 // Local connection
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& logId, MemberId member,
                        bool isCatchUp, bool isLink, unsigned int ssf
 ) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
-    connection(&output, cluster.getBroker(),
-               isCatchUp ? shadowPrefix+logId : logId,
-               ssf,
-               isLink,
-               isCatchUp ? ++catchUpId : 0),
-    expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self),
+    connectionCtor(&output, cluster.getBroker(),
+                   isCatchUp ? shadowPrefix+logId : logId,
+                   ssf,
+                   isLink,
+                   isCatchUp ? ++catchUpId : 0),
+    expectProtocolHeader(isLink),
+    mcastFrameHandler(cluster.getMulticast(), self),
     consumerNumbering(c.getUpdateReceiver().consumerNumbering)
-{ init(); }
+{
+    cluster.addLocalConnection(this);
+    if (isLocalClient()) {
+        // Local clients are announced to the cluster
+        // and initialized when the announce is received.
+        QPID_LOG(info, "new client connection " << *this);
+        giveReadCredit(cluster.getSettings().readMax);
+        cluster.getMulticast().mcastControl(
+            ClusterConnectionAnnounceBody(ProtocolVersion(), getSsf()), getId());
+    }
+    else {
+        // Catch-up connections initialized immediately.
+        assert(catchUp);
+        QPID_LOG(info, "new catch-up connection " << *this);
+        init();
+    }
+}
 
 void Connection::init() {
-    QPID_LOG(debug, cluster << " new connection: " << *this);
+    connection = connectionCtor.construct();
+    QPID_LOG(debug, cluster << " initialized connection: " << *this
+             << " ssf=" << connection->getSSF());
     if (isLocalClient()) {  
-        connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node
-        cluster.addLocalConnection(this);
-        giveReadCredit(cluster.getSettings().readMax);
+        // Actively send cluster-order frames from local node
+        connection->setClusterOrderOutput(mcastFrameHandler);
     }
-    else {                                                  // Shadow or catch-up connection
-        connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
-        connection.setClientThrottling(false);              // Disable client throttling, done by active node.
-        connection.setShadow(); // Mark the broker connection as a shadow.
+    else {                      // Shadow or catch-up connection
+        // Passive, discard cluster-order frames
+        connection->setClusterOrderOutput(nullFrameHandler);
+        // Disable client throttling, done by active node.
+        connection->setClientThrottling(false);
+        connection->setShadow(); // Mark the connection as a shadow.
     }
     if (!isCatchUp())
-        connection.setErrorListener(this);
+        connection->setErrorListener(this);
 }
 
 void Connection::giveReadCredit(int credit) {
@@ -116,8 +138,13 @@
         output.giveReadCredit(credit);
 }
 
+void Connection::announce(uint32_t ssf) {
+    assert(ssf == connectionCtor.ssf);
+    init();
+}
+
 Connection::~Connection() {
-    connection.setErrorListener(0);
+    if (connection.get()) connection->setErrorListener(0);
     QPID_LOG(debug, cluster << " deleted connection: " << *this);
 }
 
@@ -131,14 +158,15 @@
     if (isLocal()) {            // Local catch-up connection.
         currentChannel = f.getChannel();
         if (!framing::invoke(*this, *f.getBody()).wasHandled())
-            connection.received(f);
+
+            connection->received(f);
     }
     else {             // Shadow or updated catch-up connection.
         if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
             if (isShadow()) 
                 cluster.addShadowConnection(this);
             AMQFrame ok((ConnectionCloseOkBody()));
-            connection.getOutput().send(ok);
+            connection->getOutput().send(ok);
             output.closeOutput();
             catchUp = false;
         }
@@ -155,7 +183,7 @@
         }
     }
     if (!message.empty())
-        connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message);
+        connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message);
     return !message.empty();
 }
 
@@ -177,9 +205,9 @@
         && !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
     {
         if (f.type == DATA) // incoming data frames to broker::Connection
-            connection.received(const_cast<AMQFrame&>(f.frame)); 
+            connection->received(const_cast<AMQFrame&>(f.frame));
         else {           // frame control, send frame via SessionState
-            broker::SessionState* ss = connection.getChannel(currentChannel).getSession();
+            broker::SessionState* ss = connection->getChannel(currentChannel).getSession();
             if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
         }
     }
@@ -194,7 +222,7 @@
         }
         else if (isUpdated()) {
             QPID_LOG(debug, cluster << " closed update connection " << *this);
-            connection.closed();
+            connection->closed();
         }
         else if (isLocal()) {
             QPID_LOG(debug, cluster << " local close of replicated connection " << *this);
@@ -213,13 +241,13 @@
 // Self-delivery of close message, close the connection.
 void Connection::deliverClose () {
     assert(!catchUp);
-    connection.closed();
+    connection->closed();
     cluster.erase(self);
 }
 
 // The connection has been killed for misbehaving
 void Connection::abort() {
-    connection.abort();
+    if (connection.get()) connection->abort();
     cluster.erase(self);
 }
 
@@ -257,7 +285,7 @@
 }
 
 broker::SessionState& Connection::sessionState() {
-    return *connection.getChannel(currentChannel).getSession();
+    return *connection->getChannel(currentChannel).getSession();
 }
 
 broker::SemanticState& Connection::semanticState() {
@@ -294,26 +322,26 @@
         receivedIncomplete);
     QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
     // The output tasks will be added later in the update process. 
-    connection.getOutputTasks().removeAll();
+    connection->getOutputTasks().removeAll();
 }
 
 void Connection::outputTask(uint16_t channel, const std::string& name) {
-    broker::SessionState* session = connection.getChannel(channel).getSession();
+    broker::SessionState* session = connection->getChannel(channel).getSession();
     if (!session)
         throw Exception(QPID_MSG(cluster << " channel not attached " << *this
                                  << "[" <<  channel << "] "));
     OutputTask* task = &session->getSemanticState().find(name);
-    connection.getOutputTasks().addOutputTask(task);
+    connection->getOutputTasks().addOutputTask(task);
 }
     
 void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) {
     ConnectionId shadowId = ConnectionId(memberId, connectionId);
     QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
     self = shadowId;
-    connection.setUserId(username);
+    connection->setUserId(username);
     // OK to use decoder here because cluster is stalled for update.
     cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
-    connection.setErrorListener(this);
+    connection->setErrorListener(this);
     output.setSendMax(sendMax);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=903864&r1=903863&r2=903864&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Jan 27 22:20:36 2010
@@ -73,7 +73,7 @@
     ~Connection();
     
     ConnectionId getId() const { return self; }
-    broker::Connection& getBrokerConnection() { return connection; }
+    broker::Connection& getBrokerConnection() { return *connection; }
 
     /** Local connections may be clients or catch-up connections */
     bool isLocal() const;
@@ -95,8 +95,8 @@
     void received(framing::AMQFrame&);
     void closed();
     bool doOutput();
-    void idleOut() { connection.idleOut(); }
-    void idleIn() { connection.idleIn(); }
+    void idleOut() { if (connection.get()) connection->idleOut(); }
+    void idleIn() { if (connection.get()) connection->idleIn(); }
 
     // ConnectionCodec methods - called by IO layer with a read buffer.
     size_t decode(const char* buffer, size_t size);
@@ -156,7 +156,7 @@
     void exchange(const std::string& encoded);
 
     void giveReadCredit(int credit);
-    void announce(uint32_t) {}  // handled by Cluster.
+    void announce(uint32_t ssf);
     void abort();
     void deliverClose();
 
@@ -165,11 +165,36 @@
     void addQueueListener(const std::string& queue, uint32_t listener);
     void managementSchema(const std::string& data);
 
+    uint32_t getSsf() const { return connectionCtor.ssf; }
+
   private:
     struct NullFrameHandler : public framing::FrameHandler {
         void handle(framing::AMQFrame&) {}
     };
-    
+
+    // Arguments to construct a broker::Connection
+    struct ConnectionCtor {
+        sys::ConnectionOutputHandler* out;
+        broker::Broker& broker;
+        std::string mgmtId;
+        unsigned int ssf;
+        bool isLink;
+        uint64_t objectId;
+
+        ConnectionCtor(
+            sys::ConnectionOutputHandler* out_,
+            broker::Broker& broker_,
+            const std::string& mgmtId_,
+            unsigned int ssf_,
+            bool isLink_=false,
+            uint64_t objectId_=0
+        ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_), isLink(isLink_), objectId(objectId_) {}
+
+        std::auto_ptr<broker::Connection> construct() {
+            return std::auto_ptr<broker::Connection>(
+                new broker::Connection(out, broker, mgmtId, ssf, isLink, objectId));
+        }
+    };
 
     static NullFrameHandler nullFrameHandler;
 
@@ -191,7 +216,8 @@
     bool catchUp;
     OutputInterceptor output;
     framing::FrameDecoder localDecoder;
-    broker::Connection connection;
+    ConnectionCtor connectionCtor;
+    std::auto_ptr<broker::Connection> connection;
     framing::SequenceNumber deliverSeq;
     framing::ChannelId currentChannel;
     boost::shared_ptr<broker::TxBuffer> txBuffer;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org