You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by Alan Conway <ac...@redhat.com> on 2010/02/11 19:36:58 UTC

Re: svn commit: r907123 - in /qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/xml/ python/commands/ specs/]

Apologies for late reply, accident with mail filters and I missed the mail.

> This commit broke the Java Build by altering the management-schema.xml
> which is used commonly between the two brokers to generate QMF
> classes.
>
> Moreover, prior to this commit there was no JIRA or discussion on the
> list about a possible change... nor does the commit itself do much to
> explain what the change means (although since it relates to clusters,
> I can take a fair stab at guessing the answer to "is a connection on
> the Java Broker a shadow connection?" is false.)

Mea culpa, I simply did not think about the Java broker. This change did deserve 
more explanation in form of JIRA.

> As I discussed in e-mails at the beginning of the year, I think we
> should be aiming at keeping the broker functionality as close a
> possible between the two brokers.  This requires all of us to discuss
> upcoming proposed changes to things like management-schema.xml before
> we commit them so that both communities can have there say, and we can
> co-ordinate changes so neither community breaks the other's build.

Agreed. Oversight on my part.

> it is designed around the limitations of the C++ broker wrt Virtual
> Hosts (there can be only one) and transports/ports (similarly
> restricted).  However, while I would like to rectify this before our
> next release... I will be raising a JIRA and expect to discuss the
> impact and co-ordinate timing through the list... That is unless
> people would rather I just commit the change... and let the C++ guys
> work out how to fix their build after I break it ;-)
>
> BTW thanks to Rajith for putting in an emergency fix to the Java
> codebase to make it compile again...
>

If its still a serious problem for the Java builds then we can revert my commit 
- let me know.
Lets discuss the right approach on: https://issues.apache.org/jira/browse/QPID-2403

I'm on vacation from tomorrow for a week so feel free to revert my commit if 
that's necessary and I haven't done so before I go.


> -- Rob
>
> ---------- Forwarded message ----------
> From:  <ac...@apache.org>
> Date: 6 February 2010 00:02
> Subject: svn commit: r907123 - in /qpid/trunk/qpid:
> cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/xml/ python/commands/
> specs/
> To: commits@qpid.apache.org
>
>
> Author: aconway
> Date: Fri Feb  5 23:02:45 2010
> New Revision: 907123
>
> URL: http://svn.apache.org/viewvc?rev=907123&view=rev
> Log:
> Consistent connection names across a cluster.
>
> - use the same host:port for connections and their shadows.
> - add shadow property to managment connection to identify shadows.
> - updated qpid-stat and qpid-cluster to filter on shadow property.
>
> Modified:
>    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
>    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
>    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
>    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
>    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
>    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
>    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
>    qpid/trunk/qpid/cpp/xml/cluster.xml
>    qpid/trunk/qpid/python/commands/qpid-cluster
>    qpid/trunk/qpid/python/commands/qpid-stat
>    qpid/trunk/qpid/specs/management-schema.xml
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Feb  5 23:02:45 2010
> @@ -72,7 +72,7 @@
>     }
>  };
>
> -Connection::Connection(ConnectionOutputHandler* out_, Broker&
> broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_,
> uint64_t objectId) :
> +Connection::Connection(ConnectionOutputHandler* out_, Broker&
> broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_,
> uint64_t objectId, bool shadow_) :
>     ConnectionState(out_, broker_),
>     ssf(ssf),
>     adapter(*this, isLink_),
> @@ -84,7 +84,7 @@
>     agent(0),
>     timer(broker_.getTimer()),
>     errorListener(0),
> -    shadow(false)
> +    shadow(shadow_)
>  {
>     Manageable* parent = broker.GetVhostObject();
>
> @@ -95,10 +95,10 @@
>     {
>         agent = broker_.getManagementAgent();
>
> -
>         // TODO set last bool true if system connection
>         if (agent != 0) {
>             mgmtObject = new _qmf::Connection(agent, this, parent,
> mgmtId, !isLink, false);
> +            mgmtObject->set_shadow(shadow);
>             agent->addObject(mgmtObject, objectId, true);
>         }
>         ConnectionState::setUrl(mgmtId);
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Feb  5 23:02:45 2010
> @@ -79,7 +79,7 @@
>     };
>
>     Connection(sys::ConnectionOutputHandler* out, Broker& broker,
> const std::string& mgmtId, unsigned int ssf,
> -               bool isLink = false, uint64_t objectId = 0);
> +               bool isLink = false, uint64_t objectId = 0, bool shadow=false);
>     ~Connection ();
>
>     /** Get the SessionHandler for channel. Create if it does not
> already exist */
> @@ -132,8 +132,6 @@
>
>     /** True if this is a shadow connection in a cluster. */
>     bool isShadow() { return shadow; }
> -    /** Called by cluster to mark shadow connections */
> -    void setShadow() { shadow = true; }
>
>     // Used by cluster to update connection status
>     sys::AggregateOutput& getOutputTasks() { return outputTasks; }
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Feb  5 23:02:45 2010
> @@ -509,10 +509,8 @@
>             assert(cp);
>         }
>         else {              // New remote connection, create a shadow.
> -            std::ostringstream mgmtId;
>             unsigned int ssf = (announce && announce->hasSsf()) ?
> announce->getSsf() : 0;
> -            mgmtId << id;
> -            cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf);
> +            cp = new Connection(*this, shadowOut,
> announce->getManagementId(), id, ssf);
>         }
>         connections.insert(ConnectionMap::value_type(id, cp));
>     }
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Feb  5 23:02:45 2010
> @@ -23,6 +23,7 @@
>  #include "Cluster.h"
>  #include "UpdateReceiver.h"
>
> +#include "qpid/assert.h"
>  #include "qpid/broker/SessionState.h"
>  #include "qpid/broker/SemanticState.h"
>  #include "qpid/broker/TxBuffer.h"
> @@ -74,28 +75,30 @@
>
>
>  // Shadow connection
> -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
> const std::string& logId,
> +Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
> +                       const std::string& mgmtId,
>                        const ConnectionId& id, unsigned int ssf)
>     : cluster(c), self(id), catchUp(false), output(*this, out),
> -      connectionCtor(&output, cluster.getBroker(), shadowPrefix+logId, ssf),
> +      connectionCtor(&output, cluster.getBroker(), mgmtId, ssf,
> false, 0, true),
>       expectProtocolHeader(false),
>       mcastFrameHandler(cluster.getMulticast(), self),
> -      consumerNumbering(c.getUpdateReceiver().consumerNumbering)
> +      updateIn(c.getUpdateReceiver())
>  {}
>
>  // Local connection
>  Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
> -                       const std::string& logId, MemberId member,
> +                       const std::string& mgmtId, MemberId member,
>                        bool isCatchUp, bool isLink, unsigned int ssf
>  ) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp),
> output(*this, out),
>     connectionCtor(&output, cluster.getBroker(),
> -                   isCatchUp ? shadowPrefix+logId : logId,
> +                   mgmtId,
>                    ssf,
>                    isLink,
> -                   isCatchUp ? ++catchUpId : 0),
> +                   isCatchUp ? ++catchUpId : 0,
> +                   isCatchUp),  // isCatchUp => shadow
>     expectProtocolHeader(isLink),
>     mcastFrameHandler(cluster.getMulticast(), self),
> -    consumerNumbering(c.getUpdateReceiver().consumerNumbering)
> +    updateIn(c.getUpdateReceiver())
>  {
>     cluster.addLocalConnection(this);
>     if (isLocalClient()) {
> @@ -104,12 +107,14 @@
>         QPID_LOG(info, "new client connection " << *this);
>         giveReadCredit(cluster.getSettings().readMax);
>         cluster.getMulticast().mcastControl(
> -            ClusterConnectionAnnounceBody(ProtocolVersion(),
> getSsf()), getId());
> +            ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId,
> getSsf()), getId());
>     }
>     else {
> -        // Catch-up connections initialized immediately.
> +        // Catch-up shadow connections initialized using nextShadow id.
>         assert(catchUp);
>         QPID_LOG(info, "new catch-up connection " << *this);
> +        connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
> +        updateIn.nextShadowMgmtId.clear();
>         init();
>     }
>  }
> @@ -127,7 +132,6 @@
>         connection->setClusterOrderOutput(nullFrameHandler);
>         // Disable client throttling, done by active node.
>         connection->setClientThrottling(false);
> -        connection->setShadow(); // Mark the connection as a shadow.
>     }
>     if (!isCatchUp())
>         connection->setErrorListener(this);
> @@ -138,8 +142,9 @@
>         output.giveReadCredit(credit);
>  }
>
> -void Connection::announce(uint32_t ssf) {
> -    assert(ssf == connectionCtor.ssf);
> +void Connection::announce(const std::string& mgmtId, uint32_t ssf) {
> +    QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
> +    QPID_ASSERT(ssf == connectionCtor.ssf);
>     init();
>  }
>
> @@ -296,13 +301,17 @@
>     return sessionState().getSemanticState();
>  }
>
> +void Connection::shadowPrepare(const std::string& mgmtId) {
> +    updateIn.nextShadowMgmtId = mgmtId;
> +}
> +
>  void Connection::consumerState(const string& name, bool blocked, bool
> notifyEnabled, const SequenceNumber& position)
>  {
>     broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
>     c.position = position;
>     c.setBlocked(blocked);
>     if (notifyEnabled) c.enableNotify(); else c.disableNotify();
> -    consumerNumbering.add(c.shared_from_this());
> +    updateIn.consumerNumbering.add(c.shared_from_this());
>  }
>
>
> @@ -337,10 +346,15 @@
>     OutputTask* task = &session->getSemanticState().find(name);
>     connection->getOutputTasks().addOutputTask(task);
>  }
> -
> -void Connection::shadowReady(uint64_t memberId, uint64_t
> connectionId, const string& username, const string& fragment, uint32_t
> sendMax) {
> +
> +void Connection::shadowReady(
> +    uint64_t memberId, uint64_t connectionId, const string& mgmtId,
> +    const string& username, const string& fragment, uint32_t sendMax)
> +{
> +    QPID_ASSERT(mgmtId == getBrokerConnection().getMgmtId());
>     ConnectionId shadowId = ConnectionId(memberId, connectionId);
> -    QPID_LOG(debug, cluster << " catch-up connection " << *this << "
> becomes shadow " << shadowId);
> +    QPID_LOG(debug, cluster << " catch-up connection " << *this
> +             << " becomes shadow " << shadowId);
>     self = shadowId;
>     connection->setUserId(username);
>     // OK to use decoder here because cluster is stalled for update.
> @@ -355,7 +369,7 @@
>  {
>     QPID_LOG(debug, cluster << " incoming update complete on
> connection " << *this);
>     cluster.updateInDone(ClusterMap(joiners, members, frameSeq, configSeq));
> -    consumerNumbering.clear();
> +    updateIn.consumerNumbering.clear();
>     self.second = 0;        // Mark this as completed update connection.
>  }
>
> @@ -503,9 +517,9 @@
>  }
>
>  void Connection::addQueueListener(const std::string& q, uint32_t listener) {
> -    if (listener >= consumerNumbering.size())
> +    if (listener >= updateIn.consumerNumbering.size())
>         throw Exception(QPID_MSG("Invalid listener ID: " << listener));
> -    findQueue(q)->getListeners().addListener(consumerNumbering[listener]);
> +    findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]);
>  }
>
>  void Connection::managementSchema(const std::string& data) {
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Feb  5 23:02:45 2010
> @@ -65,10 +65,10 @@
>   public:
>
>     /** Local connection. */
> -    Connection(Cluster&, sys::ConnectionOutputHandler& out, const
> std::string& logId, MemberId, bool catchUp, bool isLink,
> +    Connection(Cluster&, sys::ConnectionOutputHandler& out, const
> std::string& mgmtId, MemberId, bool catchUp, bool isLink,
>                unsigned int ssf);
>     /** Shadow connection. */
> -    Connection(Cluster&, sys::ConnectionOutputHandler& out, const
> std::string& logId, const ConnectionId& id,
> +    Connection(Cluster&, sys::ConnectionOutputHandler& out, const
> std::string& mgmtId, const ConnectionId& id,
>                unsigned int ssf);
>     ~Connection();
>
> @@ -109,6 +109,8 @@
>     // ==== Used in catch-up mode to build initial state.
>     //
>     // State update methods.
> +    void shadowPrepare(const std::string&);
> +
>     void sessionState(const framing::SequenceNumber& replayStart,
>                       const framing::SequenceNumber& sendCommandPoint,
>                       const framing::SequenceSet& sentIncomplete,
> @@ -119,7 +121,12 @@
>
>     void outputTask(uint16_t channel, const std::string& name);
>
> -    void shadowReady(uint64_t memberId, uint64_t connectionId, const
> std::string& username, const std::string& fragment, uint32_t sendMax);
> +    void shadowReady(uint64_t memberId,
> +                     uint64_t connectionId,
> +                     const std::string& managementId,
> +                     const std::string& username,
> +                     const std::string& fragment,
> +                     uint32_t sendMax);
>
>     void membership(const framing::FieldTable&, const framing::FieldTable&,
>                     const framing::SequenceNumber& frameSeq,
> @@ -156,7 +163,7 @@
>     void exchange(const std::string& encoded);
>
>     void giveReadCredit(int credit);
> -    void announce(uint32_t ssf);
> +    void announce(const std::string& mgmtId, uint32_t ssf);
>     void abort();
>     void deliverClose();
>
> @@ -182,6 +189,7 @@
>         unsigned int ssf;
>         bool isLink;
>         uint64_t objectId;
> +        bool shadow;
>
>         ConnectionCtor(
>             sys::ConnectionOutputHandler* out_,
> @@ -189,12 +197,15 @@
>             const std::string& mgmtId_,
>             unsigned int ssf_,
>             bool isLink_=false,
> -            uint64_t objectId_=0
> -        ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_),
> isLink(isLink_), objectId(objectId_) {}
> +            uint64_t objectId_=0,
> +            bool shadow_=false
> +        ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_),
> +            isLink(isLink_), objectId(objectId_), shadow(shadow_)
> +        {}
>
>         std::auto_ptr<broker::Connection> construct() {
>             return std::auto_ptr<broker::Connection>(
> -                new broker::Connection(out, broker, mgmtId, ssf,
> isLink, objectId));
> +                new broker::Connection(out, broker, mgmtId, ssf,
> isLink, objectId, shadow));
>         }
>     };
>
> @@ -225,7 +236,7 @@
>     boost::shared_ptr<broker::TxBuffer> txBuffer;
>     bool expectProtocolHeader;
>     McastFrameHandler mcastFrameHandler;
> -    UpdateReceiver::ConsumerNumbering& consumerNumbering;
> +    UpdateReceiver& updateIn;
>
>     static qpid::sys::AtomicValue<uint64_t> catchUpId;
>
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Fri Feb  5
> 23:02:45 2010
> @@ -57,6 +57,7 @@
>  #include <boost/bind.hpp>
>  #include <boost/cast.hpp>
>  #include <algorithm>
> +#include <sstream>
>
>  namespace qpid {
>  namespace cluster {
> @@ -148,7 +149,7 @@
>     ClusterConnectionProxy(session).expiryId(expiry.getId());
>
>     updateManagementAgent();
> -
> +
>     ClusterConnectionMembershipBody membership;
>     map.toMethodBody(membership);
>     AMQFrame frame(membership);
> @@ -328,6 +329,14 @@
>
>  void UpdateClient::updateConnection(const
> boost::intrusive_ptr<Connection>& updateConnection) {
>     QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
> +
> +    // Send the management ID first on the main connection.
> +    std::string mgmtId = updateConnection->getBrokerConnection().getMgmtId();
> +    ClusterConnectionProxy(session).shadowPrepare(mgmtId);
> +    // Make sure its received before opening shadow connection
> +    session.sync();
> +
> +    // Open shadow connection and update it.
>     shadowConnection = catchUpConnection();
>
>     broker::Connection& bc = updateConnection->getBrokerConnection();
> @@ -341,6 +350,7 @@
>     ClusterConnectionProxy(shadowConnection).shadowReady(
>         updateConnection->getId().getMember(),
>         updateConnection->getId().getNumber(),
> +        bc.getMgmtId(),
>         bc.getUserId(),
>         string(fragment.first, fragment.second),
>         updateConnection->getOutput().getSendMax()
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h Fri Feb  5
> 23:02:45 2010
> @@ -36,6 +36,9 @@
>     /** Numbering used to identify Queue listeners as consumers */
>     typedef Numbering<boost::shared_ptr<broker::SemanticState::ConsumerImpl>
>> > ConsumerNumbering;
>     ConsumerNumbering consumerNumbering;
> +
> +    /** Management-id for the next shadow connection */
> +    std::string nextShadowMgmtId;
>  };
>  }} // namespace qpid::cluster
>
>
> Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
> +++ qpid/trunk/qpid/cpp/xml/cluster.xml Fri Feb  5 23:02:45 2010
> @@ -117,6 +117,7 @@
>
>     <!-- Announce a new connection -->
>     <control name="announce" code="0x1">
> +      <field name="management-id" type="str16"/>
>       <!-- Security Strength Factor (ssf): if the transport provides
>       encryption (e.g. ssl), ssf is the bit length of the key.  Zero if no
>       encryption provided. -->
> @@ -135,13 +136,18 @@
>     <control name="abort" code="0x4"/>
>
>     <!-- Update controls. Sent to a new broker in joining mode.
> -        A connection is updateed as followed:
> -        - open as a normal connection.
> +        A connection is updated as followed:
> +        - send the shadow's management ID in shadow-perpare on the
> update connection
> +        - open the shadow as a normal connection.
>         - attach sessions, create consumers, set flow with normal AMQP
> cokmmands.
>         - send /reset additional session state with controls below.
>         - send shadow-ready to mark end of shadow update.
>         - send membership when entire update is complete.
>     -->
> +    <!-- Prepare to send a shadow connection with the given ID. -->
> +    <control name="shadow-prepare" code="0x0F">
> +      <field name="management-id" type="str16"/>
> +    </control>
>
>     <!-- Consumer state that cannot be set by standard AMQP controls. -->
>     <control name="consumer-state" code="0x10">
> @@ -202,6 +208,7 @@
>     <control name="shadow-ready" code="0x20" label="End of shadow
> connection update.">
>       <field name="member-id" type="uint64"/>
>       <field name="connection-id" type="uint64"/>
> +      <field name="management-id" type="str16"/>
>       <field name="user-name" type="str8"/>
>       <field name="fragment" type="str32"/>
>       <field name="send-max" type="uint32"/>
>
> Modified: qpid/trunk/qpid/python/commands/qpid-cluster
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-cluster?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/python/commands/qpid-cluster (original)
> +++ qpid/trunk/qpid/python/commands/qpid-cluster Fri Feb  5 23:02:45 2010
> @@ -193,7 +193,6 @@
>         self.qmf.delBroker(self.broker)
>         self.broker = None
>         self.brokers = []
> -        pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
>
>         idx = 0
>         for host in hostList:
> @@ -209,7 +208,7 @@
>                 print "Clients on Member: ID=%s:" % displayList[idx]
>             connList = self.qmf.getObjects(_class="connection",
> _package="org.apache.qpid.broker", _broker=broker)
>             for conn in connList:
> -                if pattern.match(conn.address):
> +                if not conn.shadow:
>                     if self.config._numeric or self.config._delConn:
>                         a = conn.address
>                     else:
>
> Modified: qpid/trunk/qpid/python/commands/qpid-stat
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-stat?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/python/commands/qpid-stat (original)
> +++ qpid/trunk/qpid/python/commands/qpid-stat Fri Feb  5 23:02:45 2010
> @@ -34,7 +34,6 @@
>  _limit = 50
>  _increasing = False
>  _sortcol = None
> -pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
>
>  def Usage ():
>     print "Usage:  qpid-stat [OPTIONS] [broker-addr]"
> @@ -108,7 +107,7 @@
>
>         list = qmf.getObjects(_class="connection", _package=package,
> _agent=self.brokerAgent)
>         for conn in list:
> -            if pattern.match(conn.address):
> +            if not conn.shadow:
>                 self.connections[conn.getObjectId()] = conn
>
>         list = qmf.getObjects(_class="session", _package=package,
> _agent=self.brokerAgent)
>
> Modified: qpid/trunk/qpid/specs/management-schema.xml
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/specs/management-schema.xml (original)
> +++ qpid/trunk/qpid/specs/management-schema.xml Fri Feb  5 23:02:45 2010
> @@ -236,6 +236,7 @@
>     <property name="remoteProcessName"  type="sstr"   access="RO"
> optional="y" desc="Name of executable running as remote client"/>
>     <property name="remotePid"          type="uint32" access="RO"
> optional="y" desc="Process ID of remote client"/>
>     <property name="remoteParentPid"    type="uint32" access="RO"
> optional="y" desc="Parent Process ID of remote client"/>
> +    <property name="shadow"             type="bool"   access="RO"
> desc="True for shadow connections"/>
>     <statistic name="closing"          type="bool" desc="This client
> is closing by management request"/>
>     <statistic name="framesFromClient" type="count64"/>
>     <statistic name="framesToClient"   type="count64"/>
>
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:commits-subscribe@qpid.apache.org
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: svn commit: r907123 - in /qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/xml/ python/commands/ specs/]

Posted by Robert Godfrey <ro...@gmail.com>.
On 11 February 2010 19:36, Alan Conway <ac...@redhat.com> wrote:
> Apologies for late reply, accident with mail filters and I missed the mail.
>
>> This commit broke the Java Build by altering the management-schema.xml
>> which is used commonly between the two brokers to generate QMF
>> classes.
>>
>> Moreover, prior to this commit there was no JIRA or discussion on the
>> list about a possible change... nor does the commit itself do much to
>> explain what the change means (although since it relates to clusters,
>> I can take a fair stab at guessing the answer to "is a connection on
>> the Java Broker a shadow connection?" is false.)
>
> Mea culpa, I simply did not think about the Java broker. This change did
> deserve more explanation in form of JIRA.
>
>> As I discussed in e-mails at the beginning of the year, I think we
>> should be aiming at keeping the broker functionality as close a
>> possible between the two brokers.  This requires all of us to discuss
>> upcoming proposed changes to things like management-schema.xml before
>> we commit them so that both communities can have there say, and we can
>> co-ordinate changes so neither community breaks the other's build.
>
> Agreed. Oversight on my part.
>
>> it is designed around the limitations of the C++ broker wrt Virtual
>> Hosts (there can be only one) and transports/ports (similarly
>> restricted).  However, while I would like to rectify this before our
>> next release... I will be raising a JIRA and expect to discuss the
>> impact and co-ordinate timing through the list... That is unless
>> people would rather I just commit the change... and let the C++ guys
>> work out how to fix their build after I break it ;-)
>>
>> BTW thanks to Rajith for putting in an emergency fix to the Java
>> codebase to make it compile again...
>>
>
> If its still a serious problem for the Java builds then we can revert my
> commit - let me know.

No problem - Rajith's commit stopped the build from failing though
querying the connection object via QMF on the Java Broker may make the
QMF client unwell right now :-)

> Lets discuss the right approach on:
> https://issues.apache.org/jira/browse/QPID-2403
>
> I'm on vacation from tomorrow for a week so feel free to revert my commit if
> that's necessary and I haven't done so before I go.
>

No need to revert... the change was a relatively minor one :-) I'm
(notionally) fine with having the attribute there and just having the
Java Broker always report false... It does seem to me though more of a
generic issue with managing a broker which is part of a cluster...
objects which are present in the logical "clustered" broker, but
aren't actually resident on the broker you are querying...

BTW I'm also out next week..

-- Rob

>>
>> ---------- Forwarded message ----------
>> From:  <ac...@apache.org>
>> Date: 6 February 2010 00:02
>> Subject: svn commit: r907123 - in /qpid/trunk/qpid:
>> cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/xml/ python/commands/
>> specs/
>> To: commits@qpid.apache.org
>>
>>
>> Author: aconway
>> Date: Fri Feb  5 23:02:45 2010
>> New Revision: 907123
>>
>> URL: http://svn.apache.org/viewvc?rev=907123&view=rev
>> Log:
>> Consistent connection names across a cluster.
>>
>> - use the same host:port for connections and their shadows.
>> - add shadow property to managment connection to identify shadows.
>> - updated qpid-stat and qpid-cluster to filter on shadow property.
>>
>> Modified:
>>   qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
>>   qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
>>   qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
>>   qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
>>   qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
>>   qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
>>   qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
>>   qpid/trunk/qpid/cpp/xml/cluster.xml
>>   qpid/trunk/qpid/python/commands/qpid-cluster
>>   qpid/trunk/qpid/python/commands/qpid-stat
>>   qpid/trunk/qpid/specs/management-schema.xml
>>
>> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
>> +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Feb  5 23:02:45
>> 2010
>> @@ -72,7 +72,7 @@
>>    }
>>  };
>>
>> -Connection::Connection(ConnectionOutputHandler* out_, Broker&
>> broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_,
>> uint64_t objectId) :
>> +Connection::Connection(ConnectionOutputHandler* out_, Broker&
>> broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_,
>> uint64_t objectId, bool shadow_) :
>>    ConnectionState(out_, broker_),
>>    ssf(ssf),
>>    adapter(*this, isLink_),
>> @@ -84,7 +84,7 @@
>>    agent(0),
>>    timer(broker_.getTimer()),
>>    errorListener(0),
>> -    shadow(false)
>> +    shadow(shadow_)
>>  {
>>    Manageable* parent = broker.GetVhostObject();
>>
>> @@ -95,10 +95,10 @@
>>    {
>>        agent = broker_.getManagementAgent();
>>
>> -
>>        // TODO set last bool true if system connection
>>        if (agent != 0) {
>>            mgmtObject = new _qmf::Connection(agent, this, parent,
>> mgmtId, !isLink, false);
>> +            mgmtObject->set_shadow(shadow);
>>            agent->addObject(mgmtObject, objectId, true);
>>        }
>>        ConnectionState::setUrl(mgmtId);
>>
>> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
>> +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Feb  5 23:02:45
>> 2010
>> @@ -79,7 +79,7 @@
>>    };
>>
>>    Connection(sys::ConnectionOutputHandler* out, Broker& broker,
>> const std::string& mgmtId, unsigned int ssf,
>> -               bool isLink = false, uint64_t objectId = 0);
>> +               bool isLink = false, uint64_t objectId = 0, bool
>> shadow=false);
>>    ~Connection ();
>>
>>    /** Get the SessionHandler for channel. Create if it does not
>> already exist */
>> @@ -132,8 +132,6 @@
>>
>>    /** True if this is a shadow connection in a cluster. */
>>    bool isShadow() { return shadow; }
>> -    /** Called by cluster to mark shadow connections */
>> -    void setShadow() { shadow = true; }
>>
>>    // Used by cluster to update connection status
>>    sys::AggregateOutput& getOutputTasks() { return outputTasks; }
>>
>> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
>> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Feb  5 23:02:45
>> 2010
>> @@ -509,10 +509,8 @@
>>            assert(cp);
>>        }
>>        else {              // New remote connection, create a shadow.
>> -            std::ostringstream mgmtId;
>>            unsigned int ssf = (announce && announce->hasSsf()) ?
>> announce->getSsf() : 0;
>> -            mgmtId << id;
>> -            cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf);
>> +            cp = new Connection(*this, shadowOut,
>> announce->getManagementId(), id, ssf);
>>        }
>>        connections.insert(ConnectionMap::value_type(id, cp));
>>    }
>>
>> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
>> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Feb  5
>> 23:02:45 2010
>> @@ -23,6 +23,7 @@
>>  #include "Cluster.h"
>>  #include "UpdateReceiver.h"
>>
>> +#include "qpid/assert.h"
>>  #include "qpid/broker/SessionState.h"
>>  #include "qpid/broker/SemanticState.h"
>>  #include "qpid/broker/TxBuffer.h"
>> @@ -74,28 +75,30 @@
>>
>>
>>  // Shadow connection
>> -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
>> const std::string& logId,
>> +Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
>> +                       const std::string& mgmtId,
>>                       const ConnectionId& id, unsigned int ssf)
>>    : cluster(c), self(id), catchUp(false), output(*this, out),
>> -      connectionCtor(&output, cluster.getBroker(), shadowPrefix+logId,
>> ssf),
>> +      connectionCtor(&output, cluster.getBroker(), mgmtId, ssf,
>> false, 0, true),
>>      expectProtocolHeader(false),
>>      mcastFrameHandler(cluster.getMulticast(), self),
>> -      consumerNumbering(c.getUpdateReceiver().consumerNumbering)
>> +      updateIn(c.getUpdateReceiver())
>>  {}
>>
>>  // Local connection
>>  Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
>> -                       const std::string& logId, MemberId member,
>> +                       const std::string& mgmtId, MemberId member,
>>                       bool isCatchUp, bool isLink, unsigned int ssf
>>  ) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp),
>> output(*this, out),
>>    connectionCtor(&output, cluster.getBroker(),
>> -                   isCatchUp ? shadowPrefix+logId : logId,
>> +                   mgmtId,
>>                   ssf,
>>                   isLink,
>> -                   isCatchUp ? ++catchUpId : 0),
>> +                   isCatchUp ? ++catchUpId : 0,
>> +                   isCatchUp),  // isCatchUp => shadow
>>    expectProtocolHeader(isLink),
>>    mcastFrameHandler(cluster.getMulticast(), self),
>> -    consumerNumbering(c.getUpdateReceiver().consumerNumbering)
>> +    updateIn(c.getUpdateReceiver())
>>  {
>>    cluster.addLocalConnection(this);
>>    if (isLocalClient()) {
>> @@ -104,12 +107,14 @@
>>        QPID_LOG(info, "new client connection " << *this);
>>        giveReadCredit(cluster.getSettings().readMax);
>>        cluster.getMulticast().mcastControl(
>> -            ClusterConnectionAnnounceBody(ProtocolVersion(),
>> getSsf()), getId());
>> +            ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId,
>> getSsf()), getId());
>>    }
>>    else {
>> -        // Catch-up connections initialized immediately.
>> +        // Catch-up shadow connections initialized using nextShadow id.
>>        assert(catchUp);
>>        QPID_LOG(info, "new catch-up connection " << *this);
>> +        connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
>> +        updateIn.nextShadowMgmtId.clear();
>>        init();
>>    }
>>  }
>> @@ -127,7 +132,6 @@
>>        connection->setClusterOrderOutput(nullFrameHandler);
>>        // Disable client throttling, done by active node.
>>        connection->setClientThrottling(false);
>> -        connection->setShadow(); // Mark the connection as a shadow.
>>    }
>>    if (!isCatchUp())
>>        connection->setErrorListener(this);
>> @@ -138,8 +142,9 @@
>>        output.giveReadCredit(credit);
>>  }
>>
>> -void Connection::announce(uint32_t ssf) {
>> -    assert(ssf == connectionCtor.ssf);
>> +void Connection::announce(const std::string& mgmtId, uint32_t ssf) {
>> +    QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
>> +    QPID_ASSERT(ssf == connectionCtor.ssf);
>>    init();
>>  }
>>
>> @@ -296,13 +301,17 @@
>>    return sessionState().getSemanticState();
>>  }
>>
>> +void Connection::shadowPrepare(const std::string& mgmtId) {
>> +    updateIn.nextShadowMgmtId = mgmtId;
>> +}
>> +
>>  void Connection::consumerState(const string& name, bool blocked, bool
>> notifyEnabled, const SequenceNumber& position)
>>  {
>>    broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
>>    c.position = position;
>>    c.setBlocked(blocked);
>>    if (notifyEnabled) c.enableNotify(); else c.disableNotify();
>> -    consumerNumbering.add(c.shared_from_this());
>> +    updateIn.consumerNumbering.add(c.shared_from_this());
>>  }
>>
>>
>> @@ -337,10 +346,15 @@
>>    OutputTask* task = &session->getSemanticState().find(name);
>>    connection->getOutputTasks().addOutputTask(task);
>>  }
>> -
>> -void Connection::shadowReady(uint64_t memberId, uint64_t
>> connectionId, const string& username, const string& fragment, uint32_t
>> sendMax) {
>> +
>> +void Connection::shadowReady(
>> +    uint64_t memberId, uint64_t connectionId, const string& mgmtId,
>> +    const string& username, const string& fragment, uint32_t sendMax)
>> +{
>> +    QPID_ASSERT(mgmtId == getBrokerConnection().getMgmtId());
>>    ConnectionId shadowId = ConnectionId(memberId, connectionId);
>> -    QPID_LOG(debug, cluster << " catch-up connection " << *this << "
>> becomes shadow " << shadowId);
>> +    QPID_LOG(debug, cluster << " catch-up connection " << *this
>> +             << " becomes shadow " << shadowId);
>>    self = shadowId;
>>    connection->setUserId(username);
>>    // OK to use decoder here because cluster is stalled for update.
>> @@ -355,7 +369,7 @@
>>  {
>>    QPID_LOG(debug, cluster << " incoming update complete on
>> connection " << *this);
>>    cluster.updateInDone(ClusterMap(joiners, members, frameSeq,
>> configSeq));
>> -    consumerNumbering.clear();
>> +    updateIn.consumerNumbering.clear();
>>    self.second = 0;        // Mark this as completed update connection.
>>  }
>>
>> @@ -503,9 +517,9 @@
>>  }
>>
>>  void Connection::addQueueListener(const std::string& q, uint32_t
>> listener) {
>> -    if (listener >= consumerNumbering.size())
>> +    if (listener >= updateIn.consumerNumbering.size())
>>        throw Exception(QPID_MSG("Invalid listener ID: " << listener));
>> -
>>  findQueue(q)->getListeners().addListener(consumerNumbering[listener]);
>> +
>>  findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]);
>>  }
>>
>>  void Connection::managementSchema(const std::string& data) {
>>
>> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
>> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Feb  5 23:02:45
>> 2010
>> @@ -65,10 +65,10 @@
>>  public:
>>
>>    /** Local connection. */
>> -    Connection(Cluster&, sys::ConnectionOutputHandler& out, const
>> std::string& logId, MemberId, bool catchUp, bool isLink,
>> +    Connection(Cluster&, sys::ConnectionOutputHandler& out, const
>> std::string& mgmtId, MemberId, bool catchUp, bool isLink,
>>               unsigned int ssf);
>>    /** Shadow connection. */
>> -    Connection(Cluster&, sys::ConnectionOutputHandler& out, const
>> std::string& logId, const ConnectionId& id,
>> +    Connection(Cluster&, sys::ConnectionOutputHandler& out, const
>> std::string& mgmtId, const ConnectionId& id,
>>               unsigned int ssf);
>>    ~Connection();
>>
>> @@ -109,6 +109,8 @@
>>    // ==== Used in catch-up mode to build initial state.
>>    //
>>    // State update methods.
>> +    void shadowPrepare(const std::string&);
>> +
>>    void sessionState(const framing::SequenceNumber& replayStart,
>>                      const framing::SequenceNumber& sendCommandPoint,
>>                      const framing::SequenceSet& sentIncomplete,
>> @@ -119,7 +121,12 @@
>>
>>    void outputTask(uint16_t channel, const std::string& name);
>>
>> -    void shadowReady(uint64_t memberId, uint64_t connectionId, const
>> std::string& username, const std::string& fragment, uint32_t sendMax);
>> +    void shadowReady(uint64_t memberId,
>> +                     uint64_t connectionId,
>> +                     const std::string& managementId,
>> +                     const std::string& username,
>> +                     const std::string& fragment,
>> +                     uint32_t sendMax);
>>
>>    void membership(const framing::FieldTable&, const framing::FieldTable&,
>>                    const framing::SequenceNumber& frameSeq,
>> @@ -156,7 +163,7 @@
>>    void exchange(const std::string& encoded);
>>
>>    void giveReadCredit(int credit);
>> -    void announce(uint32_t ssf);
>> +    void announce(const std::string& mgmtId, uint32_t ssf);
>>    void abort();
>>    void deliverClose();
>>
>> @@ -182,6 +189,7 @@
>>        unsigned int ssf;
>>        bool isLink;
>>        uint64_t objectId;
>> +        bool shadow;
>>
>>        ConnectionCtor(
>>            sys::ConnectionOutputHandler* out_,
>> @@ -189,12 +197,15 @@
>>            const std::string& mgmtId_,
>>            unsigned int ssf_,
>>            bool isLink_=false,
>> -            uint64_t objectId_=0
>> -        ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_),
>> isLink(isLink_), objectId(objectId_) {}
>> +            uint64_t objectId_=0,
>> +            bool shadow_=false
>> +        ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_),
>> +            isLink(isLink_), objectId(objectId_), shadow(shadow_)
>> +        {}
>>
>>        std::auto_ptr<broker::Connection> construct() {
>>            return std::auto_ptr<broker::Connection>(
>> -                new broker::Connection(out, broker, mgmtId, ssf,
>> isLink, objectId));
>> +                new broker::Connection(out, broker, mgmtId, ssf,
>> isLink, objectId, shadow));
>>        }
>>    };
>>
>> @@ -225,7 +236,7 @@
>>    boost::shared_ptr<broker::TxBuffer> txBuffer;
>>    bool expectProtocolHeader;
>>    McastFrameHandler mcastFrameHandler;
>> -    UpdateReceiver::ConsumerNumbering& consumerNumbering;
>> +    UpdateReceiver& updateIn;
>>
>>    static qpid::sys::AtomicValue<uint64_t> catchUpId;
>>
>>
>> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
>> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Fri Feb  5
>> 23:02:45 2010
>> @@ -57,6 +57,7 @@
>>  #include <boost/bind.hpp>
>>  #include <boost/cast.hpp>
>>  #include <algorithm>
>> +#include <sstream>
>>
>>  namespace qpid {
>>  namespace cluster {
>> @@ -148,7 +149,7 @@
>>    ClusterConnectionProxy(session).expiryId(expiry.getId());
>>
>>    updateManagementAgent();
>> -
>> +
>>    ClusterConnectionMembershipBody membership;
>>    map.toMethodBody(membership);
>>    AMQFrame frame(membership);
>> @@ -328,6 +329,14 @@
>>
>>  void UpdateClient::updateConnection(const
>> boost::intrusive_ptr<Connection>& updateConnection) {
>>    QPID_LOG(debug, updaterId << " updating connection " <<
>> *updateConnection);
>> +
>> +    // Send the management ID first on the main connection.
>> +    std::string mgmtId =
>> updateConnection->getBrokerConnection().getMgmtId();
>> +    ClusterConnectionProxy(session).shadowPrepare(mgmtId);
>> +    // Make sure its received before opening shadow connection
>> +    session.sync();
>> +
>> +    // Open shadow connection and update it.
>>    shadowConnection = catchUpConnection();
>>
>>    broker::Connection& bc = updateConnection->getBrokerConnection();
>> @@ -341,6 +350,7 @@
>>    ClusterConnectionProxy(shadowConnection).shadowReady(
>>        updateConnection->getId().getMember(),
>>        updateConnection->getId().getNumber(),
>> +        bc.getMgmtId(),
>>        bc.getUserId(),
>>        string(fragment.first, fragment.second),
>>        updateConnection->getOutput().getSendMax()
>>
>> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h (original)
>> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h Fri Feb  5
>> 23:02:45 2010
>> @@ -36,6 +36,9 @@
>>    /** Numbering used to identify Queue listeners as consumers */
>>    typedef
>> Numbering<boost::shared_ptr<broker::SemanticState::ConsumerImpl>
>>>
>>> > ConsumerNumbering;
>>
>>    ConsumerNumbering consumerNumbering;
>> +
>> +    /** Management-id for the next shadow connection */
>> +    std::string nextShadowMgmtId;
>>  };
>>  }} // namespace qpid::cluster
>>
>>
>> Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
>> +++ qpid/trunk/qpid/cpp/xml/cluster.xml Fri Feb  5 23:02:45 2010
>> @@ -117,6 +117,7 @@
>>
>>    <!-- Announce a new connection -->
>>    <control name="announce" code="0x1">
>> +      <field name="management-id" type="str16"/>
>>      <!-- Security Strength Factor (ssf): if the transport provides
>>      encryption (e.g. ssl), ssf is the bit length of the key.  Zero if no
>>      encryption provided. -->
>> @@ -135,13 +136,18 @@
>>    <control name="abort" code="0x4"/>
>>
>>    <!-- Update controls. Sent to a new broker in joining mode.
>> -        A connection is updateed as followed:
>> -        - open as a normal connection.
>> +        A connection is updated as followed:
>> +        - send the shadow's management ID in shadow-perpare on the
>> update connection
>> +        - open the shadow as a normal connection.
>>        - attach sessions, create consumers, set flow with normal AMQP
>> cokmmands.
>>        - send /reset additional session state with controls below.
>>        - send shadow-ready to mark end of shadow update.
>>        - send membership when entire update is complete.
>>    -->
>> +    <!-- Prepare to send a shadow connection with the given ID. -->
>> +    <control name="shadow-prepare" code="0x0F">
>> +      <field name="management-id" type="str16"/>
>> +    </control>
>>
>>    <!-- Consumer state that cannot be set by standard AMQP controls. -->
>>    <control name="consumer-state" code="0x10">
>> @@ -202,6 +208,7 @@
>>    <control name="shadow-ready" code="0x20" label="End of shadow
>> connection update.">
>>      <field name="member-id" type="uint64"/>
>>      <field name="connection-id" type="uint64"/>
>> +      <field name="management-id" type="str16"/>
>>      <field name="user-name" type="str8"/>
>>      <field name="fragment" type="str32"/>
>>      <field name="send-max" type="uint32"/>
>>
>> Modified: qpid/trunk/qpid/python/commands/qpid-cluster
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-cluster?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/python/commands/qpid-cluster (original)
>> +++ qpid/trunk/qpid/python/commands/qpid-cluster Fri Feb  5 23:02:45 2010
>> @@ -193,7 +193,6 @@
>>        self.qmf.delBroker(self.broker)
>>        self.broker = None
>>        self.brokers = []
>> -        pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
>>
>>        idx = 0
>>        for host in hostList:
>> @@ -209,7 +208,7 @@
>>                print "Clients on Member: ID=%s:" % displayList[idx]
>>            connList = self.qmf.getObjects(_class="connection",
>> _package="org.apache.qpid.broker", _broker=broker)
>>            for conn in connList:
>> -                if pattern.match(conn.address):
>> +                if not conn.shadow:
>>                    if self.config._numeric or self.config._delConn:
>>                        a = conn.address
>>                    else:
>>
>> Modified: qpid/trunk/qpid/python/commands/qpid-stat
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-stat?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/python/commands/qpid-stat (original)
>> +++ qpid/trunk/qpid/python/commands/qpid-stat Fri Feb  5 23:02:45 2010
>> @@ -34,7 +34,6 @@
>>  _limit = 50
>>  _increasing = False
>>  _sortcol = None
>> -pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
>>
>>  def Usage ():
>>    print "Usage:  qpid-stat [OPTIONS] [broker-addr]"
>> @@ -108,7 +107,7 @@
>>
>>        list = qmf.getObjects(_class="connection", _package=package,
>> _agent=self.brokerAgent)
>>        for conn in list:
>> -            if pattern.match(conn.address):
>> +            if not conn.shadow:
>>                self.connections[conn.getObjectId()] = conn
>>
>>        list = qmf.getObjects(_class="session", _package=package,
>> _agent=self.brokerAgent)
>>
>> Modified: qpid/trunk/qpid/specs/management-schema.xml
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/specs/management-schema.xml (original)
>> +++ qpid/trunk/qpid/specs/management-schema.xml Fri Feb  5 23:02:45 2010
>> @@ -236,6 +236,7 @@
>>    <property name="remoteProcessName"  type="sstr"   access="RO"
>> optional="y" desc="Name of executable running as remote client"/>
>>    <property name="remotePid"          type="uint32" access="RO"
>> optional="y" desc="Process ID of remote client"/>
>>    <property name="remoteParentPid"    type="uint32" access="RO"
>> optional="y" desc="Parent Process ID of remote client"/>
>> +    <property name="shadow"             type="bool"   access="RO"
>> desc="True for shadow connections"/>
>>    <statistic name="closing"          type="bool" desc="This client
>> is closing by management request"/>
>>    <statistic name="framesFromClient" type="count64"/>
>>    <statistic name="framesToClient"   type="count64"/>
>>
>>
>>
>> ---------------------------------------------------------------------
>> Apache Qpid - AMQP Messaging Implementation
>> Project:      http://qpid.apache.org
>> Use/Interact: mailto:commits-subscribe@qpid.apache.org
>>
>> ---------------------------------------------------------------------
>> Apache Qpid - AMQP Messaging Implementation
>> Project:      http://qpid.apache.org
>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>
>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org