You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by Alan Conway <ac...@redhat.com> on 2010/02/11 19:36:58 UTC
Re: svn commit: r907123 - in /qpid/trunk/qpid: cpp/src/qpid/broker/
cpp/src/qpid/cluster/ cpp/xml/ python/commands/ specs/]
Apologies for late reply, accident with mail filters and I missed the mail.
> This commit broke the Java Build by altering the management-schema.xml
> which is used commonly between the two brokers to generate QMF
> classes.
>
> Moreover, prior to this commit there was no JIRA or discussion on the
> list about a possible change... nor does the commit itself do much to
> explain what the change means (although since it relates to clusters,
> I can take a fair stab at guessing the answer to "is a connection on
> the Java Broker a shadow connection?" is false.)
Mea culpa, I simply did not think about the Java broker. This change did deserve
more explanation in form of JIRA.
> As I discussed in e-mails at the beginning of the year, I think we
> should be aiming at keeping the broker functionality as close a
> possible between the two brokers. This requires all of us to discuss
> upcoming proposed changes to things like management-schema.xml before
> we commit them so that both communities can have there say, and we can
> co-ordinate changes so neither community breaks the other's build.
Agreed. Oversight on my part.
> it is designed around the limitations of the C++ broker wrt Virtual
> Hosts (there can be only one) and transports/ports (similarly
> restricted). However, while I would like to rectify this before our
> next release... I will be raising a JIRA and expect to discuss the
> impact and co-ordinate timing through the list... That is unless
> people would rather I just commit the change... and let the C++ guys
> work out how to fix their build after I break it ;-)
>
> BTW thanks to Rajith for putting in an emergency fix to the Java
> codebase to make it compile again...
>
If its still a serious problem for the Java builds then we can revert my commit
- let me know.
Lets discuss the right approach on: https://issues.apache.org/jira/browse/QPID-2403
I'm on vacation from tomorrow for a week so feel free to revert my commit if
that's necessary and I haven't done so before I go.
> -- Rob
>
> ---------- Forwarded message ----------
> From: <ac...@apache.org>
> Date: 6 February 2010 00:02
> Subject: svn commit: r907123 - in /qpid/trunk/qpid:
> cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/xml/ python/commands/
> specs/
> To: commits@qpid.apache.org
>
>
> Author: aconway
> Date: Fri Feb 5 23:02:45 2010
> New Revision: 907123
>
> URL: http://svn.apache.org/viewvc?rev=907123&view=rev
> Log:
> Consistent connection names across a cluster.
>
> - use the same host:port for connections and their shadows.
> - add shadow property to managment connection to identify shadows.
> - updated qpid-stat and qpid-cluster to filter on shadow property.
>
> Modified:
> qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
> qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
> qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
> qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
> qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
> qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
> qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
> qpid/trunk/qpid/cpp/xml/cluster.xml
> qpid/trunk/qpid/python/commands/qpid-cluster
> qpid/trunk/qpid/python/commands/qpid-stat
> qpid/trunk/qpid/specs/management-schema.xml
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Feb 5 23:02:45 2010
> @@ -72,7 +72,7 @@
> }
> };
>
> -Connection::Connection(ConnectionOutputHandler* out_, Broker&
> broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_,
> uint64_t objectId) :
> +Connection::Connection(ConnectionOutputHandler* out_, Broker&
> broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_,
> uint64_t objectId, bool shadow_) :
> ConnectionState(out_, broker_),
> ssf(ssf),
> adapter(*this, isLink_),
> @@ -84,7 +84,7 @@
> agent(0),
> timer(broker_.getTimer()),
> errorListener(0),
> - shadow(false)
> + shadow(shadow_)
> {
> Manageable* parent = broker.GetVhostObject();
>
> @@ -95,10 +95,10 @@
> {
> agent = broker_.getManagementAgent();
>
> -
> // TODO set last bool true if system connection
> if (agent != 0) {
> mgmtObject = new _qmf::Connection(agent, this, parent,
> mgmtId, !isLink, false);
> + mgmtObject->set_shadow(shadow);
> agent->addObject(mgmtObject, objectId, true);
> }
> ConnectionState::setUrl(mgmtId);
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Feb 5 23:02:45 2010
> @@ -79,7 +79,7 @@
> };
>
> Connection(sys::ConnectionOutputHandler* out, Broker& broker,
> const std::string& mgmtId, unsigned int ssf,
> - bool isLink = false, uint64_t objectId = 0);
> + bool isLink = false, uint64_t objectId = 0, bool shadow=false);
> ~Connection ();
>
> /** Get the SessionHandler for channel. Create if it does not
> already exist */
> @@ -132,8 +132,6 @@
>
> /** True if this is a shadow connection in a cluster. */
> bool isShadow() { return shadow; }
> - /** Called by cluster to mark shadow connections */
> - void setShadow() { shadow = true; }
>
> // Used by cluster to update connection status
> sys::AggregateOutput& getOutputTasks() { return outputTasks; }
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Feb 5 23:02:45 2010
> @@ -509,10 +509,8 @@
> assert(cp);
> }
> else { // New remote connection, create a shadow.
> - std::ostringstream mgmtId;
> unsigned int ssf = (announce && announce->hasSsf()) ?
> announce->getSsf() : 0;
> - mgmtId << id;
> - cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf);
> + cp = new Connection(*this, shadowOut,
> announce->getManagementId(), id, ssf);
> }
> connections.insert(ConnectionMap::value_type(id, cp));
> }
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Feb 5 23:02:45 2010
> @@ -23,6 +23,7 @@
> #include "Cluster.h"
> #include "UpdateReceiver.h"
>
> +#include "qpid/assert.h"
> #include "qpid/broker/SessionState.h"
> #include "qpid/broker/SemanticState.h"
> #include "qpid/broker/TxBuffer.h"
> @@ -74,28 +75,30 @@
>
>
> // Shadow connection
> -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
> const std::string& logId,
> +Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
> + const std::string& mgmtId,
> const ConnectionId& id, unsigned int ssf)
> : cluster(c), self(id), catchUp(false), output(*this, out),
> - connectionCtor(&output, cluster.getBroker(), shadowPrefix+logId, ssf),
> + connectionCtor(&output, cluster.getBroker(), mgmtId, ssf,
> false, 0, true),
> expectProtocolHeader(false),
> mcastFrameHandler(cluster.getMulticast(), self),
> - consumerNumbering(c.getUpdateReceiver().consumerNumbering)
> + updateIn(c.getUpdateReceiver())
> {}
>
> // Local connection
> Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
> - const std::string& logId, MemberId member,
> + const std::string& mgmtId, MemberId member,
> bool isCatchUp, bool isLink, unsigned int ssf
> ) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp),
> output(*this, out),
> connectionCtor(&output, cluster.getBroker(),
> - isCatchUp ? shadowPrefix+logId : logId,
> + mgmtId,
> ssf,
> isLink,
> - isCatchUp ? ++catchUpId : 0),
> + isCatchUp ? ++catchUpId : 0,
> + isCatchUp), // isCatchUp => shadow
> expectProtocolHeader(isLink),
> mcastFrameHandler(cluster.getMulticast(), self),
> - consumerNumbering(c.getUpdateReceiver().consumerNumbering)
> + updateIn(c.getUpdateReceiver())
> {
> cluster.addLocalConnection(this);
> if (isLocalClient()) {
> @@ -104,12 +107,14 @@
> QPID_LOG(info, "new client connection " << *this);
> giveReadCredit(cluster.getSettings().readMax);
> cluster.getMulticast().mcastControl(
> - ClusterConnectionAnnounceBody(ProtocolVersion(),
> getSsf()), getId());
> + ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId,
> getSsf()), getId());
> }
> else {
> - // Catch-up connections initialized immediately.
> + // Catch-up shadow connections initialized using nextShadow id.
> assert(catchUp);
> QPID_LOG(info, "new catch-up connection " << *this);
> + connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
> + updateIn.nextShadowMgmtId.clear();
> init();
> }
> }
> @@ -127,7 +132,6 @@
> connection->setClusterOrderOutput(nullFrameHandler);
> // Disable client throttling, done by active node.
> connection->setClientThrottling(false);
> - connection->setShadow(); // Mark the connection as a shadow.
> }
> if (!isCatchUp())
> connection->setErrorListener(this);
> @@ -138,8 +142,9 @@
> output.giveReadCredit(credit);
> }
>
> -void Connection::announce(uint32_t ssf) {
> - assert(ssf == connectionCtor.ssf);
> +void Connection::announce(const std::string& mgmtId, uint32_t ssf) {
> + QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
> + QPID_ASSERT(ssf == connectionCtor.ssf);
> init();
> }
>
> @@ -296,13 +301,17 @@
> return sessionState().getSemanticState();
> }
>
> +void Connection::shadowPrepare(const std::string& mgmtId) {
> + updateIn.nextShadowMgmtId = mgmtId;
> +}
> +
> void Connection::consumerState(const string& name, bool blocked, bool
> notifyEnabled, const SequenceNumber& position)
> {
> broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
> c.position = position;
> c.setBlocked(blocked);
> if (notifyEnabled) c.enableNotify(); else c.disableNotify();
> - consumerNumbering.add(c.shared_from_this());
> + updateIn.consumerNumbering.add(c.shared_from_this());
> }
>
>
> @@ -337,10 +346,15 @@
> OutputTask* task = &session->getSemanticState().find(name);
> connection->getOutputTasks().addOutputTask(task);
> }
> -
> -void Connection::shadowReady(uint64_t memberId, uint64_t
> connectionId, const string& username, const string& fragment, uint32_t
> sendMax) {
> +
> +void Connection::shadowReady(
> + uint64_t memberId, uint64_t connectionId, const string& mgmtId,
> + const string& username, const string& fragment, uint32_t sendMax)
> +{
> + QPID_ASSERT(mgmtId == getBrokerConnection().getMgmtId());
> ConnectionId shadowId = ConnectionId(memberId, connectionId);
> - QPID_LOG(debug, cluster << " catch-up connection " << *this << "
> becomes shadow " << shadowId);
> + QPID_LOG(debug, cluster << " catch-up connection " << *this
> + << " becomes shadow " << shadowId);
> self = shadowId;
> connection->setUserId(username);
> // OK to use decoder here because cluster is stalled for update.
> @@ -355,7 +369,7 @@
> {
> QPID_LOG(debug, cluster << " incoming update complete on
> connection " << *this);
> cluster.updateInDone(ClusterMap(joiners, members, frameSeq, configSeq));
> - consumerNumbering.clear();
> + updateIn.consumerNumbering.clear();
> self.second = 0; // Mark this as completed update connection.
> }
>
> @@ -503,9 +517,9 @@
> }
>
> void Connection::addQueueListener(const std::string& q, uint32_t listener) {
> - if (listener >= consumerNumbering.size())
> + if (listener >= updateIn.consumerNumbering.size())
> throw Exception(QPID_MSG("Invalid listener ID: " << listener));
> - findQueue(q)->getListeners().addListener(consumerNumbering[listener]);
> + findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]);
> }
>
> void Connection::managementSchema(const std::string& data) {
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Feb 5 23:02:45 2010
> @@ -65,10 +65,10 @@
> public:
>
> /** Local connection. */
> - Connection(Cluster&, sys::ConnectionOutputHandler& out, const
> std::string& logId, MemberId, bool catchUp, bool isLink,
> + Connection(Cluster&, sys::ConnectionOutputHandler& out, const
> std::string& mgmtId, MemberId, bool catchUp, bool isLink,
> unsigned int ssf);
> /** Shadow connection. */
> - Connection(Cluster&, sys::ConnectionOutputHandler& out, const
> std::string& logId, const ConnectionId& id,
> + Connection(Cluster&, sys::ConnectionOutputHandler& out, const
> std::string& mgmtId, const ConnectionId& id,
> unsigned int ssf);
> ~Connection();
>
> @@ -109,6 +109,8 @@
> // ==== Used in catch-up mode to build initial state.
> //
> // State update methods.
> + void shadowPrepare(const std::string&);
> +
> void sessionState(const framing::SequenceNumber& replayStart,
> const framing::SequenceNumber& sendCommandPoint,
> const framing::SequenceSet& sentIncomplete,
> @@ -119,7 +121,12 @@
>
> void outputTask(uint16_t channel, const std::string& name);
>
> - void shadowReady(uint64_t memberId, uint64_t connectionId, const
> std::string& username, const std::string& fragment, uint32_t sendMax);
> + void shadowReady(uint64_t memberId,
> + uint64_t connectionId,
> + const std::string& managementId,
> + const std::string& username,
> + const std::string& fragment,
> + uint32_t sendMax);
>
> void membership(const framing::FieldTable&, const framing::FieldTable&,
> const framing::SequenceNumber& frameSeq,
> @@ -156,7 +163,7 @@
> void exchange(const std::string& encoded);
>
> void giveReadCredit(int credit);
> - void announce(uint32_t ssf);
> + void announce(const std::string& mgmtId, uint32_t ssf);
> void abort();
> void deliverClose();
>
> @@ -182,6 +189,7 @@
> unsigned int ssf;
> bool isLink;
> uint64_t objectId;
> + bool shadow;
>
> ConnectionCtor(
> sys::ConnectionOutputHandler* out_,
> @@ -189,12 +197,15 @@
> const std::string& mgmtId_,
> unsigned int ssf_,
> bool isLink_=false,
> - uint64_t objectId_=0
> - ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_),
> isLink(isLink_), objectId(objectId_) {}
> + uint64_t objectId_=0,
> + bool shadow_=false
> + ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_),
> + isLink(isLink_), objectId(objectId_), shadow(shadow_)
> + {}
>
> std::auto_ptr<broker::Connection> construct() {
> return std::auto_ptr<broker::Connection>(
> - new broker::Connection(out, broker, mgmtId, ssf,
> isLink, objectId));
> + new broker::Connection(out, broker, mgmtId, ssf,
> isLink, objectId, shadow));
> }
> };
>
> @@ -225,7 +236,7 @@
> boost::shared_ptr<broker::TxBuffer> txBuffer;
> bool expectProtocolHeader;
> McastFrameHandler mcastFrameHandler;
> - UpdateReceiver::ConsumerNumbering& consumerNumbering;
> + UpdateReceiver& updateIn;
>
> static qpid::sys::AtomicValue<uint64_t> catchUpId;
>
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Fri Feb 5
> 23:02:45 2010
> @@ -57,6 +57,7 @@
> #include <boost/bind.hpp>
> #include <boost/cast.hpp>
> #include <algorithm>
> +#include <sstream>
>
> namespace qpid {
> namespace cluster {
> @@ -148,7 +149,7 @@
> ClusterConnectionProxy(session).expiryId(expiry.getId());
>
> updateManagementAgent();
> -
> +
> ClusterConnectionMembershipBody membership;
> map.toMethodBody(membership);
> AMQFrame frame(membership);
> @@ -328,6 +329,14 @@
>
> void UpdateClient::updateConnection(const
> boost::intrusive_ptr<Connection>& updateConnection) {
> QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
> +
> + // Send the management ID first on the main connection.
> + std::string mgmtId = updateConnection->getBrokerConnection().getMgmtId();
> + ClusterConnectionProxy(session).shadowPrepare(mgmtId);
> + // Make sure its received before opening shadow connection
> + session.sync();
> +
> + // Open shadow connection and update it.
> shadowConnection = catchUpConnection();
>
> broker::Connection& bc = updateConnection->getBrokerConnection();
> @@ -341,6 +350,7 @@
> ClusterConnectionProxy(shadowConnection).shadowReady(
> updateConnection->getId().getMember(),
> updateConnection->getId().getNumber(),
> + bc.getMgmtId(),
> bc.getUserId(),
> string(fragment.first, fragment.second),
> updateConnection->getOutput().getSendMax()
>
> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h (original)
> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h Fri Feb 5
> 23:02:45 2010
> @@ -36,6 +36,9 @@
> /** Numbering used to identify Queue listeners as consumers */
> typedef Numbering<boost::shared_ptr<broker::SemanticState::ConsumerImpl>
>> > ConsumerNumbering;
> ConsumerNumbering consumerNumbering;
> +
> + /** Management-id for the next shadow connection */
> + std::string nextShadowMgmtId;
> };
> }} // namespace qpid::cluster
>
>
> Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
> +++ qpid/trunk/qpid/cpp/xml/cluster.xml Fri Feb 5 23:02:45 2010
> @@ -117,6 +117,7 @@
>
> <!-- Announce a new connection -->
> <control name="announce" code="0x1">
> + <field name="management-id" type="str16"/>
> <!-- Security Strength Factor (ssf): if the transport provides
> encryption (e.g. ssl), ssf is the bit length of the key. Zero if no
> encryption provided. -->
> @@ -135,13 +136,18 @@
> <control name="abort" code="0x4"/>
>
> <!-- Update controls. Sent to a new broker in joining mode.
> - A connection is updateed as followed:
> - - open as a normal connection.
> + A connection is updated as followed:
> + - send the shadow's management ID in shadow-perpare on the
> update connection
> + - open the shadow as a normal connection.
> - attach sessions, create consumers, set flow with normal AMQP
> cokmmands.
> - send /reset additional session state with controls below.
> - send shadow-ready to mark end of shadow update.
> - send membership when entire update is complete.
> -->
> + <!-- Prepare to send a shadow connection with the given ID. -->
> + <control name="shadow-prepare" code="0x0F">
> + <field name="management-id" type="str16"/>
> + </control>
>
> <!-- Consumer state that cannot be set by standard AMQP controls. -->
> <control name="consumer-state" code="0x10">
> @@ -202,6 +208,7 @@
> <control name="shadow-ready" code="0x20" label="End of shadow
> connection update.">
> <field name="member-id" type="uint64"/>
> <field name="connection-id" type="uint64"/>
> + <field name="management-id" type="str16"/>
> <field name="user-name" type="str8"/>
> <field name="fragment" type="str32"/>
> <field name="send-max" type="uint32"/>
>
> Modified: qpid/trunk/qpid/python/commands/qpid-cluster
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-cluster?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/python/commands/qpid-cluster (original)
> +++ qpid/trunk/qpid/python/commands/qpid-cluster Fri Feb 5 23:02:45 2010
> @@ -193,7 +193,6 @@
> self.qmf.delBroker(self.broker)
> self.broker = None
> self.brokers = []
> - pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
>
> idx = 0
> for host in hostList:
> @@ -209,7 +208,7 @@
> print "Clients on Member: ID=%s:" % displayList[idx]
> connList = self.qmf.getObjects(_class="connection",
> _package="org.apache.qpid.broker", _broker=broker)
> for conn in connList:
> - if pattern.match(conn.address):
> + if not conn.shadow:
> if self.config._numeric or self.config._delConn:
> a = conn.address
> else:
>
> Modified: qpid/trunk/qpid/python/commands/qpid-stat
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-stat?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/python/commands/qpid-stat (original)
> +++ qpid/trunk/qpid/python/commands/qpid-stat Fri Feb 5 23:02:45 2010
> @@ -34,7 +34,6 @@
> _limit = 50
> _increasing = False
> _sortcol = None
> -pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
>
> def Usage ():
> print "Usage: qpid-stat [OPTIONS] [broker-addr]"
> @@ -108,7 +107,7 @@
>
> list = qmf.getObjects(_class="connection", _package=package,
> _agent=self.brokerAgent)
> for conn in list:
> - if pattern.match(conn.address):
> + if not conn.shadow:
> self.connections[conn.getObjectId()] = conn
>
> list = qmf.getObjects(_class="session", _package=package,
> _agent=self.brokerAgent)
>
> Modified: qpid/trunk/qpid/specs/management-schema.xml
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=907123&r1=907122&r2=907123&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/specs/management-schema.xml (original)
> +++ qpid/trunk/qpid/specs/management-schema.xml Fri Feb 5 23:02:45 2010
> @@ -236,6 +236,7 @@
> <property name="remoteProcessName" type="sstr" access="RO"
> optional="y" desc="Name of executable running as remote client"/>
> <property name="remotePid" type="uint32" access="RO"
> optional="y" desc="Process ID of remote client"/>
> <property name="remoteParentPid" type="uint32" access="RO"
> optional="y" desc="Parent Process ID of remote client"/>
> + <property name="shadow" type="bool" access="RO"
> desc="True for shadow connections"/>
> <statistic name="closing" type="bool" desc="This client
> is closing by management request"/>
> <statistic name="framesFromClient" type="count64"/>
> <statistic name="framesToClient" type="count64"/>
>
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project: http://qpid.apache.org
> Use/Interact: mailto:commits-subscribe@qpid.apache.org
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project: http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org
Re: svn commit: r907123 - in /qpid/trunk/qpid: cpp/src/qpid/broker/
cpp/src/qpid/cluster/ cpp/xml/ python/commands/ specs/]
Posted by Robert Godfrey <ro...@gmail.com>.
On 11 February 2010 19:36, Alan Conway <ac...@redhat.com> wrote:
> Apologies for late reply, accident with mail filters and I missed the mail.
>
>> This commit broke the Java Build by altering the management-schema.xml
>> which is used commonly between the two brokers to generate QMF
>> classes.
>>
>> Moreover, prior to this commit there was no JIRA or discussion on the
>> list about a possible change... nor does the commit itself do much to
>> explain what the change means (although since it relates to clusters,
>> I can take a fair stab at guessing the answer to "is a connection on
>> the Java Broker a shadow connection?" is false.)
>
> Mea culpa, I simply did not think about the Java broker. This change did
> deserve more explanation in form of JIRA.
>
>> As I discussed in e-mails at the beginning of the year, I think we
>> should be aiming at keeping the broker functionality as close a
>> possible between the two brokers. This requires all of us to discuss
>> upcoming proposed changes to things like management-schema.xml before
>> we commit them so that both communities can have there say, and we can
>> co-ordinate changes so neither community breaks the other's build.
>
> Agreed. Oversight on my part.
>
>> it is designed around the limitations of the C++ broker wrt Virtual
>> Hosts (there can be only one) and transports/ports (similarly
>> restricted). However, while I would like to rectify this before our
>> next release... I will be raising a JIRA and expect to discuss the
>> impact and co-ordinate timing through the list... That is unless
>> people would rather I just commit the change... and let the C++ guys
>> work out how to fix their build after I break it ;-)
>>
>> BTW thanks to Rajith for putting in an emergency fix to the Java
>> codebase to make it compile again...
>>
>
> If its still a serious problem for the Java builds then we can revert my
> commit - let me know.
No problem - Rajith's commit stopped the build from failing though
querying the connection object via QMF on the Java Broker may make the
QMF client unwell right now :-)
> Lets discuss the right approach on:
> https://issues.apache.org/jira/browse/QPID-2403
>
> I'm on vacation from tomorrow for a week so feel free to revert my commit if
> that's necessary and I haven't done so before I go.
>
No need to revert... the change was a relatively minor one :-) I'm
(notionally) fine with having the attribute there and just having the
Java Broker always report false... It does seem to me though more of a
generic issue with managing a broker which is part of a cluster...
objects which are present in the logical "clustered" broker, but
aren't actually resident on the broker you are querying...
BTW I'm also out next week..
-- Rob
>>
>> ---------- Forwarded message ----------
>> From: <ac...@apache.org>
>> Date: 6 February 2010 00:02
>> Subject: svn commit: r907123 - in /qpid/trunk/qpid:
>> cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/xml/ python/commands/
>> specs/
>> To: commits@qpid.apache.org
>>
>>
>> Author: aconway
>> Date: Fri Feb 5 23:02:45 2010
>> New Revision: 907123
>>
>> URL: http://svn.apache.org/viewvc?rev=907123&view=rev
>> Log:
>> Consistent connection names across a cluster.
>>
>> - use the same host:port for connections and their shadows.
>> - add shadow property to managment connection to identify shadows.
>> - updated qpid-stat and qpid-cluster to filter on shadow property.
>>
>> Modified:
>> qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
>> qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
>> qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
>> qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
>> qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
>> qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
>> qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
>> qpid/trunk/qpid/cpp/xml/cluster.xml
>> qpid/trunk/qpid/python/commands/qpid-cluster
>> qpid/trunk/qpid/python/commands/qpid-stat
>> qpid/trunk/qpid/specs/management-schema.xml
>>
>> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
>> +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Feb 5 23:02:45
>> 2010
>> @@ -72,7 +72,7 @@
>> }
>> };
>>
>> -Connection::Connection(ConnectionOutputHandler* out_, Broker&
>> broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_,
>> uint64_t objectId) :
>> +Connection::Connection(ConnectionOutputHandler* out_, Broker&
>> broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_,
>> uint64_t objectId, bool shadow_) :
>> ConnectionState(out_, broker_),
>> ssf(ssf),
>> adapter(*this, isLink_),
>> @@ -84,7 +84,7 @@
>> agent(0),
>> timer(broker_.getTimer()),
>> errorListener(0),
>> - shadow(false)
>> + shadow(shadow_)
>> {
>> Manageable* parent = broker.GetVhostObject();
>>
>> @@ -95,10 +95,10 @@
>> {
>> agent = broker_.getManagementAgent();
>>
>> -
>> // TODO set last bool true if system connection
>> if (agent != 0) {
>> mgmtObject = new _qmf::Connection(agent, this, parent,
>> mgmtId, !isLink, false);
>> + mgmtObject->set_shadow(shadow);
>> agent->addObject(mgmtObject, objectId, true);
>> }
>> ConnectionState::setUrl(mgmtId);
>>
>> Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
>> +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Feb 5 23:02:45
>> 2010
>> @@ -79,7 +79,7 @@
>> };
>>
>> Connection(sys::ConnectionOutputHandler* out, Broker& broker,
>> const std::string& mgmtId, unsigned int ssf,
>> - bool isLink = false, uint64_t objectId = 0);
>> + bool isLink = false, uint64_t objectId = 0, bool
>> shadow=false);
>> ~Connection ();
>>
>> /** Get the SessionHandler for channel. Create if it does not
>> already exist */
>> @@ -132,8 +132,6 @@
>>
>> /** True if this is a shadow connection in a cluster. */
>> bool isShadow() { return shadow; }
>> - /** Called by cluster to mark shadow connections */
>> - void setShadow() { shadow = true; }
>>
>> // Used by cluster to update connection status
>> sys::AggregateOutput& getOutputTasks() { return outputTasks; }
>>
>> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
>> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Feb 5 23:02:45
>> 2010
>> @@ -509,10 +509,8 @@
>> assert(cp);
>> }
>> else { // New remote connection, create a shadow.
>> - std::ostringstream mgmtId;
>> unsigned int ssf = (announce && announce->hasSsf()) ?
>> announce->getSsf() : 0;
>> - mgmtId << id;
>> - cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf);
>> + cp = new Connection(*this, shadowOut,
>> announce->getManagementId(), id, ssf);
>> }
>> connections.insert(ConnectionMap::value_type(id, cp));
>> }
>>
>> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
>> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Feb 5
>> 23:02:45 2010
>> @@ -23,6 +23,7 @@
>> #include "Cluster.h"
>> #include "UpdateReceiver.h"
>>
>> +#include "qpid/assert.h"
>> #include "qpid/broker/SessionState.h"
>> #include "qpid/broker/SemanticState.h"
>> #include "qpid/broker/TxBuffer.h"
>> @@ -74,28 +75,30 @@
>>
>>
>> // Shadow connection
>> -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
>> const std::string& logId,
>> +Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
>> + const std::string& mgmtId,
>> const ConnectionId& id, unsigned int ssf)
>> : cluster(c), self(id), catchUp(false), output(*this, out),
>> - connectionCtor(&output, cluster.getBroker(), shadowPrefix+logId,
>> ssf),
>> + connectionCtor(&output, cluster.getBroker(), mgmtId, ssf,
>> false, 0, true),
>> expectProtocolHeader(false),
>> mcastFrameHandler(cluster.getMulticast(), self),
>> - consumerNumbering(c.getUpdateReceiver().consumerNumbering)
>> + updateIn(c.getUpdateReceiver())
>> {}
>>
>> // Local connection
>> Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
>> - const std::string& logId, MemberId member,
>> + const std::string& mgmtId, MemberId member,
>> bool isCatchUp, bool isLink, unsigned int ssf
>> ) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp),
>> output(*this, out),
>> connectionCtor(&output, cluster.getBroker(),
>> - isCatchUp ? shadowPrefix+logId : logId,
>> + mgmtId,
>> ssf,
>> isLink,
>> - isCatchUp ? ++catchUpId : 0),
>> + isCatchUp ? ++catchUpId : 0,
>> + isCatchUp), // isCatchUp => shadow
>> expectProtocolHeader(isLink),
>> mcastFrameHandler(cluster.getMulticast(), self),
>> - consumerNumbering(c.getUpdateReceiver().consumerNumbering)
>> + updateIn(c.getUpdateReceiver())
>> {
>> cluster.addLocalConnection(this);
>> if (isLocalClient()) {
>> @@ -104,12 +107,14 @@
>> QPID_LOG(info, "new client connection " << *this);
>> giveReadCredit(cluster.getSettings().readMax);
>> cluster.getMulticast().mcastControl(
>> - ClusterConnectionAnnounceBody(ProtocolVersion(),
>> getSsf()), getId());
>> + ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId,
>> getSsf()), getId());
>> }
>> else {
>> - // Catch-up connections initialized immediately.
>> + // Catch-up shadow connections initialized using nextShadow id.
>> assert(catchUp);
>> QPID_LOG(info, "new catch-up connection " << *this);
>> + connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
>> + updateIn.nextShadowMgmtId.clear();
>> init();
>> }
>> }
>> @@ -127,7 +132,6 @@
>> connection->setClusterOrderOutput(nullFrameHandler);
>> // Disable client throttling, done by active node.
>> connection->setClientThrottling(false);
>> - connection->setShadow(); // Mark the connection as a shadow.
>> }
>> if (!isCatchUp())
>> connection->setErrorListener(this);
>> @@ -138,8 +142,9 @@
>> output.giveReadCredit(credit);
>> }
>>
>> -void Connection::announce(uint32_t ssf) {
>> - assert(ssf == connectionCtor.ssf);
>> +void Connection::announce(const std::string& mgmtId, uint32_t ssf) {
>> + QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
>> + QPID_ASSERT(ssf == connectionCtor.ssf);
>> init();
>> }
>>
>> @@ -296,13 +301,17 @@
>> return sessionState().getSemanticState();
>> }
>>
>> +void Connection::shadowPrepare(const std::string& mgmtId) {
>> + updateIn.nextShadowMgmtId = mgmtId;
>> +}
>> +
>> void Connection::consumerState(const string& name, bool blocked, bool
>> notifyEnabled, const SequenceNumber& position)
>> {
>> broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
>> c.position = position;
>> c.setBlocked(blocked);
>> if (notifyEnabled) c.enableNotify(); else c.disableNotify();
>> - consumerNumbering.add(c.shared_from_this());
>> + updateIn.consumerNumbering.add(c.shared_from_this());
>> }
>>
>>
>> @@ -337,10 +346,15 @@
>> OutputTask* task = &session->getSemanticState().find(name);
>> connection->getOutputTasks().addOutputTask(task);
>> }
>> -
>> -void Connection::shadowReady(uint64_t memberId, uint64_t
>> connectionId, const string& username, const string& fragment, uint32_t
>> sendMax) {
>> +
>> +void Connection::shadowReady(
>> + uint64_t memberId, uint64_t connectionId, const string& mgmtId,
>> + const string& username, const string& fragment, uint32_t sendMax)
>> +{
>> + QPID_ASSERT(mgmtId == getBrokerConnection().getMgmtId());
>> ConnectionId shadowId = ConnectionId(memberId, connectionId);
>> - QPID_LOG(debug, cluster << " catch-up connection " << *this << "
>> becomes shadow " << shadowId);
>> + QPID_LOG(debug, cluster << " catch-up connection " << *this
>> + << " becomes shadow " << shadowId);
>> self = shadowId;
>> connection->setUserId(username);
>> // OK to use decoder here because cluster is stalled for update.
>> @@ -355,7 +369,7 @@
>> {
>> QPID_LOG(debug, cluster << " incoming update complete on
>> connection " << *this);
>> cluster.updateInDone(ClusterMap(joiners, members, frameSeq,
>> configSeq));
>> - consumerNumbering.clear();
>> + updateIn.consumerNumbering.clear();
>> self.second = 0; // Mark this as completed update connection.
>> }
>>
>> @@ -503,9 +517,9 @@
>> }
>>
>> void Connection::addQueueListener(const std::string& q, uint32_t
>> listener) {
>> - if (listener >= consumerNumbering.size())
>> + if (listener >= updateIn.consumerNumbering.size())
>> throw Exception(QPID_MSG("Invalid listener ID: " << listener));
>> -
>> findQueue(q)->getListeners().addListener(consumerNumbering[listener]);
>> +
>> findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]);
>> }
>>
>> void Connection::managementSchema(const std::string& data) {
>>
>> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
>> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Feb 5 23:02:45
>> 2010
>> @@ -65,10 +65,10 @@
>> public:
>>
>> /** Local connection. */
>> - Connection(Cluster&, sys::ConnectionOutputHandler& out, const
>> std::string& logId, MemberId, bool catchUp, bool isLink,
>> + Connection(Cluster&, sys::ConnectionOutputHandler& out, const
>> std::string& mgmtId, MemberId, bool catchUp, bool isLink,
>> unsigned int ssf);
>> /** Shadow connection. */
>> - Connection(Cluster&, sys::ConnectionOutputHandler& out, const
>> std::string& logId, const ConnectionId& id,
>> + Connection(Cluster&, sys::ConnectionOutputHandler& out, const
>> std::string& mgmtId, const ConnectionId& id,
>> unsigned int ssf);
>> ~Connection();
>>
>> @@ -109,6 +109,8 @@
>> // ==== Used in catch-up mode to build initial state.
>> //
>> // State update methods.
>> + void shadowPrepare(const std::string&);
>> +
>> void sessionState(const framing::SequenceNumber& replayStart,
>> const framing::SequenceNumber& sendCommandPoint,
>> const framing::SequenceSet& sentIncomplete,
>> @@ -119,7 +121,12 @@
>>
>> void outputTask(uint16_t channel, const std::string& name);
>>
>> - void shadowReady(uint64_t memberId, uint64_t connectionId, const
>> std::string& username, const std::string& fragment, uint32_t sendMax);
>> + void shadowReady(uint64_t memberId,
>> + uint64_t connectionId,
>> + const std::string& managementId,
>> + const std::string& username,
>> + const std::string& fragment,
>> + uint32_t sendMax);
>>
>> void membership(const framing::FieldTable&, const framing::FieldTable&,
>> const framing::SequenceNumber& frameSeq,
>> @@ -156,7 +163,7 @@
>> void exchange(const std::string& encoded);
>>
>> void giveReadCredit(int credit);
>> - void announce(uint32_t ssf);
>> + void announce(const std::string& mgmtId, uint32_t ssf);
>> void abort();
>> void deliverClose();
>>
>> @@ -182,6 +189,7 @@
>> unsigned int ssf;
>> bool isLink;
>> uint64_t objectId;
>> + bool shadow;
>>
>> ConnectionCtor(
>> sys::ConnectionOutputHandler* out_,
>> @@ -189,12 +197,15 @@
>> const std::string& mgmtId_,
>> unsigned int ssf_,
>> bool isLink_=false,
>> - uint64_t objectId_=0
>> - ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_),
>> isLink(isLink_), objectId(objectId_) {}
>> + uint64_t objectId_=0,
>> + bool shadow_=false
>> + ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_),
>> + isLink(isLink_), objectId(objectId_), shadow(shadow_)
>> + {}
>>
>> std::auto_ptr<broker::Connection> construct() {
>> return std::auto_ptr<broker::Connection>(
>> - new broker::Connection(out, broker, mgmtId, ssf,
>> isLink, objectId));
>> + new broker::Connection(out, broker, mgmtId, ssf,
>> isLink, objectId, shadow));
>> }
>> };
>>
>> @@ -225,7 +236,7 @@
>> boost::shared_ptr<broker::TxBuffer> txBuffer;
>> bool expectProtocolHeader;
>> McastFrameHandler mcastFrameHandler;
>> - UpdateReceiver::ConsumerNumbering& consumerNumbering;
>> + UpdateReceiver& updateIn;
>>
>> static qpid::sys::AtomicValue<uint64_t> catchUpId;
>>
>>
>> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
>> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Fri Feb 5
>> 23:02:45 2010
>> @@ -57,6 +57,7 @@
>> #include <boost/bind.hpp>
>> #include <boost/cast.hpp>
>> #include <algorithm>
>> +#include <sstream>
>>
>> namespace qpid {
>> namespace cluster {
>> @@ -148,7 +149,7 @@
>> ClusterConnectionProxy(session).expiryId(expiry.getId());
>>
>> updateManagementAgent();
>> -
>> +
>> ClusterConnectionMembershipBody membership;
>> map.toMethodBody(membership);
>> AMQFrame frame(membership);
>> @@ -328,6 +329,14 @@
>>
>> void UpdateClient::updateConnection(const
>> boost::intrusive_ptr<Connection>& updateConnection) {
>> QPID_LOG(debug, updaterId << " updating connection " <<
>> *updateConnection);
>> +
>> + // Send the management ID first on the main connection.
>> + std::string mgmtId =
>> updateConnection->getBrokerConnection().getMgmtId();
>> + ClusterConnectionProxy(session).shadowPrepare(mgmtId);
>> + // Make sure its received before opening shadow connection
>> + session.sync();
>> +
>> + // Open shadow connection and update it.
>> shadowConnection = catchUpConnection();
>>
>> broker::Connection& bc = updateConnection->getBrokerConnection();
>> @@ -341,6 +350,7 @@
>> ClusterConnectionProxy(shadowConnection).shadowReady(
>> updateConnection->getId().getMember(),
>> updateConnection->getId().getNumber(),
>> + bc.getMgmtId(),
>> bc.getUserId(),
>> string(fragment.first, fragment.second),
>> updateConnection->getOutput().getSendMax()
>>
>> Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h (original)
>> +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h Fri Feb 5
>> 23:02:45 2010
>> @@ -36,6 +36,9 @@
>> /** Numbering used to identify Queue listeners as consumers */
>> typedef
>> Numbering<boost::shared_ptr<broker::SemanticState::ConsumerImpl>
>>>
>>> > ConsumerNumbering;
>>
>> ConsumerNumbering consumerNumbering;
>> +
>> + /** Management-id for the next shadow connection */
>> + std::string nextShadowMgmtId;
>> };
>> }} // namespace qpid::cluster
>>
>>
>> Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
>> +++ qpid/trunk/qpid/cpp/xml/cluster.xml Fri Feb 5 23:02:45 2010
>> @@ -117,6 +117,7 @@
>>
>> <!-- Announce a new connection -->
>> <control name="announce" code="0x1">
>> + <field name="management-id" type="str16"/>
>> <!-- Security Strength Factor (ssf): if the transport provides
>> encryption (e.g. ssl), ssf is the bit length of the key. Zero if no
>> encryption provided. -->
>> @@ -135,13 +136,18 @@
>> <control name="abort" code="0x4"/>
>>
>> <!-- Update controls. Sent to a new broker in joining mode.
>> - A connection is updateed as followed:
>> - - open as a normal connection.
>> + A connection is updated as followed:
>> + - send the shadow's management ID in shadow-perpare on the
>> update connection
>> + - open the shadow as a normal connection.
>> - attach sessions, create consumers, set flow with normal AMQP
>> cokmmands.
>> - send /reset additional session state with controls below.
>> - send shadow-ready to mark end of shadow update.
>> - send membership when entire update is complete.
>> -->
>> + <!-- Prepare to send a shadow connection with the given ID. -->
>> + <control name="shadow-prepare" code="0x0F">
>> + <field name="management-id" type="str16"/>
>> + </control>
>>
>> <!-- Consumer state that cannot be set by standard AMQP controls. -->
>> <control name="consumer-state" code="0x10">
>> @@ -202,6 +208,7 @@
>> <control name="shadow-ready" code="0x20" label="End of shadow
>> connection update.">
>> <field name="member-id" type="uint64"/>
>> <field name="connection-id" type="uint64"/>
>> + <field name="management-id" type="str16"/>
>> <field name="user-name" type="str8"/>
>> <field name="fragment" type="str32"/>
>> <field name="send-max" type="uint32"/>
>>
>> Modified: qpid/trunk/qpid/python/commands/qpid-cluster
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-cluster?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/python/commands/qpid-cluster (original)
>> +++ qpid/trunk/qpid/python/commands/qpid-cluster Fri Feb 5 23:02:45 2010
>> @@ -193,7 +193,6 @@
>> self.qmf.delBroker(self.broker)
>> self.broker = None
>> self.brokers = []
>> - pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
>>
>> idx = 0
>> for host in hostList:
>> @@ -209,7 +208,7 @@
>> print "Clients on Member: ID=%s:" % displayList[idx]
>> connList = self.qmf.getObjects(_class="connection",
>> _package="org.apache.qpid.broker", _broker=broker)
>> for conn in connList:
>> - if pattern.match(conn.address):
>> + if not conn.shadow:
>> if self.config._numeric or self.config._delConn:
>> a = conn.address
>> else:
>>
>> Modified: qpid/trunk/qpid/python/commands/qpid-stat
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-stat?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/python/commands/qpid-stat (original)
>> +++ qpid/trunk/qpid/python/commands/qpid-stat Fri Feb 5 23:02:45 2010
>> @@ -34,7 +34,6 @@
>> _limit = 50
>> _increasing = False
>> _sortcol = None
>> -pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
>>
>> def Usage ():
>> print "Usage: qpid-stat [OPTIONS] [broker-addr]"
>> @@ -108,7 +107,7 @@
>>
>> list = qmf.getObjects(_class="connection", _package=package,
>> _agent=self.brokerAgent)
>> for conn in list:
>> - if pattern.match(conn.address):
>> + if not conn.shadow:
>> self.connections[conn.getObjectId()] = conn
>>
>> list = qmf.getObjects(_class="session", _package=package,
>> _agent=self.brokerAgent)
>>
>> Modified: qpid/trunk/qpid/specs/management-schema.xml
>> URL:
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=907123&r1=907122&r2=907123&view=diff
>>
>> ==============================================================================
>> --- qpid/trunk/qpid/specs/management-schema.xml (original)
>> +++ qpid/trunk/qpid/specs/management-schema.xml Fri Feb 5 23:02:45 2010
>> @@ -236,6 +236,7 @@
>> <property name="remoteProcessName" type="sstr" access="RO"
>> optional="y" desc="Name of executable running as remote client"/>
>> <property name="remotePid" type="uint32" access="RO"
>> optional="y" desc="Process ID of remote client"/>
>> <property name="remoteParentPid" type="uint32" access="RO"
>> optional="y" desc="Parent Process ID of remote client"/>
>> + <property name="shadow" type="bool" access="RO"
>> desc="True for shadow connections"/>
>> <statistic name="closing" type="bool" desc="This client
>> is closing by management request"/>
>> <statistic name="framesFromClient" type="count64"/>
>> <statistic name="framesToClient" type="count64"/>
>>
>>
>>
>> ---------------------------------------------------------------------
>> Apache Qpid - AMQP Messaging Implementation
>> Project: http://qpid.apache.org
>> Use/Interact: mailto:commits-subscribe@qpid.apache.org
>>
>> ---------------------------------------------------------------------
>> Apache Qpid - AMQP Messaging Implementation
>> Project: http://qpid.apache.org
>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>
>
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org