You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/02/06 00:02:46 UTC

svn commit: r907123 - in /qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/xml/ python/commands/ specs/

Author: aconway
Date: Fri Feb  5 23:02:45 2010
New Revision: 907123

URL: http://svn.apache.org/viewvc?rev=907123&view=rev
Log:
Consistent connection names across a cluster.

- use the same host:port for connections and their shadows.
- add shadow property to managment connection to identify shadows.
- updated qpid-stat and qpid-cluster to filter on shadow property.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
    qpid/trunk/qpid/cpp/xml/cluster.xml
    qpid/trunk/qpid/python/commands/qpid-cluster
    qpid/trunk/qpid/python/commands/qpid-stat
    qpid/trunk/qpid/specs/management-schema.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Feb  5 23:02:45 2010
@@ -72,7 +72,7 @@
     }
 };
 
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_, uint64_t objectId) :
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_, uint64_t objectId, bool shadow_) :
     ConnectionState(out_, broker_),
     ssf(ssf),
     adapter(*this, isLink_),
@@ -84,7 +84,7 @@
     agent(0),
     timer(broker_.getTimer()),
     errorListener(0),
-    shadow(false)
+    shadow(shadow_)
 {
     Manageable* parent = broker.GetVhostObject();
 
@@ -95,10 +95,10 @@
     {
         agent = broker_.getManagementAgent();
 
-
         // TODO set last bool true if system connection
         if (agent != 0) {
             mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
+            mgmtObject->set_shadow(shadow);
             agent->addObject(mgmtObject, objectId, true);
         }
         ConnectionState::setUrl(mgmtId);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Feb  5 23:02:45 2010
@@ -79,7 +79,7 @@
     };
 
     Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, unsigned int ssf,
-               bool isLink = false, uint64_t objectId = 0);
+               bool isLink = false, uint64_t objectId = 0, bool shadow=false);
     ~Connection ();
 
     /** Get the SessionHandler for channel. Create if it does not already exist */
@@ -132,8 +132,6 @@
 
     /** True if this is a shadow connection in a cluster. */
     bool isShadow() { return shadow; }
-    /** Called by cluster to mark shadow connections */
-    void setShadow() { shadow = true; }
 
     // Used by cluster to update connection status
     sys::AggregateOutput& getOutputTasks() { return outputTasks; }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Feb  5 23:02:45 2010
@@ -509,10 +509,8 @@
             assert(cp);
         }
         else {              // New remote connection, create a shadow.
-            std::ostringstream mgmtId;
             unsigned int ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0;
-            mgmtId << id;
-            cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf);
+            cp = new Connection(*this, shadowOut, announce->getManagementId(), id, ssf);
         }
         connections.insert(ConnectionMap::value_type(id, cp));
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Feb  5 23:02:45 2010
@@ -23,6 +23,7 @@
 #include "Cluster.h"
 #include "UpdateReceiver.h"
 
+#include "qpid/assert.h"
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/broker/TxBuffer.h"
@@ -74,28 +75,30 @@
 
 
 // Shadow connection
-Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId,
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
+                       const std::string& mgmtId,
                        const ConnectionId& id, unsigned int ssf)
     : cluster(c), self(id), catchUp(false), output(*this, out),
-      connectionCtor(&output, cluster.getBroker(), shadowPrefix+logId, ssf),
+      connectionCtor(&output, cluster.getBroker(), mgmtId, ssf, false, 0, true),
       expectProtocolHeader(false),
       mcastFrameHandler(cluster.getMulticast(), self),
-      consumerNumbering(c.getUpdateReceiver().consumerNumbering)
+      updateIn(c.getUpdateReceiver())
 {}
 
 // Local connection
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
-                       const std::string& logId, MemberId member,
+                       const std::string& mgmtId, MemberId member,
                        bool isCatchUp, bool isLink, unsigned int ssf
 ) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
     connectionCtor(&output, cluster.getBroker(),
-                   isCatchUp ? shadowPrefix+logId : logId,
+                   mgmtId,
                    ssf,
                    isLink,
-                   isCatchUp ? ++catchUpId : 0),
+                   isCatchUp ? ++catchUpId : 0,
+                   isCatchUp),  // isCatchUp => shadow
     expectProtocolHeader(isLink),
     mcastFrameHandler(cluster.getMulticast(), self),
-    consumerNumbering(c.getUpdateReceiver().consumerNumbering)
+    updateIn(c.getUpdateReceiver())
 {
     cluster.addLocalConnection(this);
     if (isLocalClient()) {
@@ -104,12 +107,14 @@
         QPID_LOG(info, "new client connection " << *this);
         giveReadCredit(cluster.getSettings().readMax);
         cluster.getMulticast().mcastControl(
-            ClusterConnectionAnnounceBody(ProtocolVersion(), getSsf()), getId());
+            ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId, getSsf()), getId());
     }
     else {
-        // Catch-up connections initialized immediately.
+        // Catch-up shadow connections initialized using nextShadow id.
         assert(catchUp);
         QPID_LOG(info, "new catch-up connection " << *this);
+        connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
+        updateIn.nextShadowMgmtId.clear();
         init();
     }
 }
@@ -127,7 +132,6 @@
         connection->setClusterOrderOutput(nullFrameHandler);
         // Disable client throttling, done by active node.
         connection->setClientThrottling(false);
-        connection->setShadow(); // Mark the connection as a shadow.
     }
     if (!isCatchUp())
         connection->setErrorListener(this);
@@ -138,8 +142,9 @@
         output.giveReadCredit(credit);
 }
 
-void Connection::announce(uint32_t ssf) {
-    assert(ssf == connectionCtor.ssf);
+void Connection::announce(const std::string& mgmtId, uint32_t ssf) {
+    QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
+    QPID_ASSERT(ssf == connectionCtor.ssf);
     init();
 }
 
@@ -296,13 +301,17 @@
     return sessionState().getSemanticState();
 }
 
+void Connection::shadowPrepare(const std::string& mgmtId) {
+    updateIn.nextShadowMgmtId = mgmtId;
+}
+
 void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
 {
     broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
     c.position = position;
     c.setBlocked(blocked);
     if (notifyEnabled) c.enableNotify(); else c.disableNotify();
-    consumerNumbering.add(c.shared_from_this());
+    updateIn.consumerNumbering.add(c.shared_from_this());
 }
 
 
@@ -337,10 +346,15 @@
     OutputTask* task = &session->getSemanticState().find(name);
     connection->getOutputTasks().addOutputTask(task);
 }
-    
-void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) {
+
+void Connection::shadowReady(
+    uint64_t memberId, uint64_t connectionId, const string& mgmtId,
+    const string& username, const string& fragment, uint32_t sendMax)
+{
+    QPID_ASSERT(mgmtId == getBrokerConnection().getMgmtId());
     ConnectionId shadowId = ConnectionId(memberId, connectionId);
-    QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
+    QPID_LOG(debug, cluster << " catch-up connection " << *this
+             << " becomes shadow " << shadowId);
     self = shadowId;
     connection->setUserId(username);
     // OK to use decoder here because cluster is stalled for update.
@@ -355,7 +369,7 @@
 {
     QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
     cluster.updateInDone(ClusterMap(joiners, members, frameSeq, configSeq));
-    consumerNumbering.clear();
+    updateIn.consumerNumbering.clear();
     self.second = 0;        // Mark this as completed update connection.
 }
 
@@ -503,9 +517,9 @@
 }
 
 void Connection::addQueueListener(const std::string& q, uint32_t listener) {
-    if (listener >= consumerNumbering.size())
+    if (listener >= updateIn.consumerNumbering.size())
         throw Exception(QPID_MSG("Invalid listener ID: " << listener));
-    findQueue(q)->getListeners().addListener(consumerNumbering[listener]);
+    findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]);
 }
 
 void Connection::managementSchema(const std::string& data) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Feb  5 23:02:45 2010
@@ -65,10 +65,10 @@
   public:
     
     /** Local connection. */
-    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink,
+    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, MemberId, bool catchUp, bool isLink,
                unsigned int ssf);
     /** Shadow connection. */
-    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id,
+    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id,
                unsigned int ssf);
     ~Connection();
     
@@ -109,6 +109,8 @@
     // ==== Used in catch-up mode to build initial state.
     // 
     // State update methods.
+    void shadowPrepare(const std::string&);
+
     void sessionState(const framing::SequenceNumber& replayStart,
                       const framing::SequenceNumber& sendCommandPoint,
                       const framing::SequenceSet& sentIncomplete,
@@ -119,7 +121,12 @@
     
     void outputTask(uint16_t channel, const std::string& name);
     
-    void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax);
+    void shadowReady(uint64_t memberId,
+                     uint64_t connectionId,
+                     const std::string& managementId,
+                     const std::string& username,
+                     const std::string& fragment,
+                     uint32_t sendMax);
 
     void membership(const framing::FieldTable&, const framing::FieldTable&,
                     const framing::SequenceNumber& frameSeq,
@@ -156,7 +163,7 @@
     void exchange(const std::string& encoded);
 
     void giveReadCredit(int credit);
-    void announce(uint32_t ssf);
+    void announce(const std::string& mgmtId, uint32_t ssf);
     void abort();
     void deliverClose();
 
@@ -182,6 +189,7 @@
         unsigned int ssf;
         bool isLink;
         uint64_t objectId;
+        bool shadow;
 
         ConnectionCtor(
             sys::ConnectionOutputHandler* out_,
@@ -189,12 +197,15 @@
             const std::string& mgmtId_,
             unsigned int ssf_,
             bool isLink_=false,
-            uint64_t objectId_=0
-        ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_), isLink(isLink_), objectId(objectId_) {}
+            uint64_t objectId_=0,
+            bool shadow_=false
+        ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_),
+            isLink(isLink_), objectId(objectId_), shadow(shadow_)
+        {}
 
         std::auto_ptr<broker::Connection> construct() {
             return std::auto_ptr<broker::Connection>(
-                new broker::Connection(out, broker, mgmtId, ssf, isLink, objectId));
+                new broker::Connection(out, broker, mgmtId, ssf, isLink, objectId, shadow));
         }
     };
 
@@ -225,7 +236,7 @@
     boost::shared_ptr<broker::TxBuffer> txBuffer;
     bool expectProtocolHeader;
     McastFrameHandler mcastFrameHandler;
-    UpdateReceiver::ConsumerNumbering& consumerNumbering;
+    UpdateReceiver& updateIn;
 
     static qpid::sys::AtomicValue<uint64_t> catchUpId;
     

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Fri Feb  5 23:02:45 2010
@@ -57,6 +57,7 @@
 #include <boost/bind.hpp>
 #include <boost/cast.hpp>
 #include <algorithm>
+#include <sstream>
 
 namespace qpid {
 namespace cluster {
@@ -148,7 +149,7 @@
     ClusterConnectionProxy(session).expiryId(expiry.getId());
 
     updateManagementAgent();
-    
+
     ClusterConnectionMembershipBody membership;
     map.toMethodBody(membership);
     AMQFrame frame(membership);
@@ -328,6 +329,14 @@
 
 void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
     QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
+
+    // Send the management ID first on the main connection.
+    std::string mgmtId = updateConnection->getBrokerConnection().getMgmtId();
+    ClusterConnectionProxy(session).shadowPrepare(mgmtId);
+    // Make sure its received before opening shadow connection
+    session.sync();
+
+    // Open shadow connection and update it.
     shadowConnection = catchUpConnection();
 
     broker::Connection& bc = updateConnection->getBrokerConnection();
@@ -341,6 +350,7 @@
     ClusterConnectionProxy(shadowConnection).shadowReady(
         updateConnection->getId().getMember(),
         updateConnection->getId().getNumber(),
+        bc.getMgmtId(),
         bc.getUserId(),
         string(fragment.first, fragment.second),
         updateConnection->getOutput().getSendMax()

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h Fri Feb  5 23:02:45 2010
@@ -36,6 +36,9 @@
     /** Numbering used to identify Queue listeners as consumers */ 
     typedef Numbering<boost::shared_ptr<broker::SemanticState::ConsumerImpl> > ConsumerNumbering;
     ConsumerNumbering consumerNumbering;
+
+    /** Management-id for the next shadow connection */
+    std::string nextShadowMgmtId;
 };
 }} // namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Fri Feb  5 23:02:45 2010
@@ -117,6 +117,7 @@
 
     <!-- Announce a new connection -->
     <control name="announce" code="0x1">
+      <field name="management-id" type="str16"/>
       <!-- Security Strength Factor (ssf): if the transport provides
       encryption (e.g. ssl), ssf is the bit length of the key.  Zero if no
       encryption provided. -->
@@ -135,13 +136,18 @@
     <control name="abort" code="0x4"/>
     
     <!-- Update controls. Sent to a new broker in joining mode.
-	 A connection is updateed as followed:
-	 - open as a normal connection.
+	 A connection is updated as followed:
+	 - send the shadow's management ID in shadow-perpare on the update connection
+	 - open the shadow as a normal connection.
 	 - attach sessions, create consumers, set flow with normal AMQP cokmmands.
 	 - send /reset additional session state with controls below.
 	 - send shadow-ready to mark end of shadow update.
 	 - send membership when entire update is complete.
     -->
+    <!-- Prepare to send a shadow connection with the given ID. -->
+    <control name="shadow-prepare" code="0x0F">
+      <field name="management-id" type="str16"/>
+    </control>
     
     <!-- Consumer state that cannot be set by standard AMQP controls. -->
     <control name="consumer-state" code="0x10">
@@ -202,6 +208,7 @@
     <control name="shadow-ready" code="0x20" label="End of shadow connection update.">
       <field name="member-id" type="uint64"/>
       <field name="connection-id" type="uint64"/>
+      <field name="management-id" type="str16"/>
       <field name="user-name" type="str8"/>
       <field name="fragment" type="str32"/>
       <field name="send-max" type="uint32"/>

Modified: qpid/trunk/qpid/python/commands/qpid-cluster
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-cluster?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-cluster (original)
+++ qpid/trunk/qpid/python/commands/qpid-cluster Fri Feb  5 23:02:45 2010
@@ -193,7 +193,6 @@
         self.qmf.delBroker(self.broker)
         self.broker = None
         self.brokers = []
-        pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
 
         idx = 0
         for host in hostList:
@@ -209,7 +208,7 @@
                 print "Clients on Member: ID=%s:" % displayList[idx]
             connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker)
             for conn in connList:
-                if pattern.match(conn.address):
+                if not conn.shadow:
                     if self.config._numeric or self.config._delConn:
                         a = conn.address
                     else:

Modified: qpid/trunk/qpid/python/commands/qpid-stat
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-stat?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-stat (original)
+++ qpid/trunk/qpid/python/commands/qpid-stat Fri Feb  5 23:02:45 2010
@@ -34,7 +34,6 @@
 _limit = 50
 _increasing = False
 _sortcol = None
-pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
 
 def Usage ():
     print "Usage:  qpid-stat [OPTIONS] [broker-addr]"
@@ -108,7 +107,7 @@
 
         list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent)
         for conn in list:
-            if pattern.match(conn.address):
+            if not conn.shadow:
                 self.connections[conn.getObjectId()] = conn
 
         list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent)

Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Fri Feb  5 23:02:45 2010
@@ -236,6 +236,7 @@
     <property name="remoteProcessName"  type="sstr"   access="RO" optional="y" desc="Name of executable running as remote client"/>
     <property name="remotePid"          type="uint32" access="RO" optional="y" desc="Process ID of remote client"/>
     <property name="remoteParentPid"    type="uint32" access="RO" optional="y" desc="Parent Process ID of remote client"/>
+    <property name="shadow"             type="bool"   access="RO" desc="True for shadow connections"/>
     <statistic name="closing"          type="bool" desc="This client is closing by management request"/>
     <statistic name="framesFromClient" type="count64"/>
     <statistic name="framesToClient"   type="count64"/>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Fwd: svn commit: r907123 - in /qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/xml/ python/commands/ specs/

Posted by Robert Godfrey <ro...@gmail.com>.
This commit broke the Java Build by altering the management-schema.xml
which is used commonly between the two brokers to generate QMF
classes.

Moreover, prior to this commit there was no JIRA or discussion on the
list about a possible change... nor does the commit itself do much to
explain what the change means (although since it relates to clusters,
I can take a fair stab at guessing the answer to "is a connection on
the Java Broker a shadow connection?" is false.)

As I discussed in e-mails at the beginning of the year, I think we
should be aiming at keeping the broker functionality as close a
possible between the two brokers.  This requires all of us to discuss
upcoming proposed changes to things like management-schema.xml before
we commit them so that both communities can have there say, and we can
co-ordinate changes so neither community breaks the other's build.

The schema as-is is significantly deficient for the Java Broker since
it is designed around the limitations of the C++ broker wrt Virtual
Hosts (there can be only one) and transports/ports (similarly
restricted).  However, while I would like to rectify this before our
next release... I will be raising a JIRA and expect to discuss the
impact and co-ordinate timing through the list... That is unless
people would rather I just commit the change... and let the C++ guys
work out how to fix their build after I break it ;-)

BTW thanks to Rajith for putting in an emergency fix to the Java
codebase to make it compile again...

-- Rob

---------- Forwarded message ----------
From:  <ac...@apache.org>
Date: 6 February 2010 00:02
Subject: svn commit: r907123 - in /qpid/trunk/qpid:
cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/xml/ python/commands/
specs/
To: commits@qpid.apache.org


Author: aconway
Date: Fri Feb  5 23:02:45 2010
New Revision: 907123

URL: http://svn.apache.org/viewvc?rev=907123&view=rev
Log:
Consistent connection names across a cluster.

- use the same host:port for connections and their shadows.
- add shadow property to managment connection to identify shadows.
- updated qpid-stat and qpid-cluster to filter on shadow property.

Modified:
   qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
   qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
   qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
   qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
   qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
   qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
   qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
   qpid/trunk/qpid/cpp/xml/cluster.xml
   qpid/trunk/qpid/python/commands/qpid-cluster
   qpid/trunk/qpid/python/commands/qpid-stat
   qpid/trunk/qpid/specs/management-schema.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Feb  5 23:02:45 2010
@@ -72,7 +72,7 @@
    }
 };

-Connection::Connection(ConnectionOutputHandler* out_, Broker&
broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_,
uint64_t objectId) :
+Connection::Connection(ConnectionOutputHandler* out_, Broker&
broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_,
uint64_t objectId, bool shadow_) :
    ConnectionState(out_, broker_),
    ssf(ssf),
    adapter(*this, isLink_),
@@ -84,7 +84,7 @@
    agent(0),
    timer(broker_.getTimer()),
    errorListener(0),
-    shadow(false)
+    shadow(shadow_)
 {
    Manageable* parent = broker.GetVhostObject();

@@ -95,10 +95,10 @@
    {
        agent = broker_.getManagementAgent();

-
        // TODO set last bool true if system connection
        if (agent != 0) {
            mgmtObject = new _qmf::Connection(agent, this, parent,
mgmtId, !isLink, false);
+            mgmtObject->set_shadow(shadow);
            agent->addObject(mgmtObject, objectId, true);
        }
        ConnectionState::setUrl(mgmtId);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Feb  5 23:02:45 2010
@@ -79,7 +79,7 @@
    };

    Connection(sys::ConnectionOutputHandler* out, Broker& broker,
const std::string& mgmtId, unsigned int ssf,
-               bool isLink = false, uint64_t objectId = 0);
+               bool isLink = false, uint64_t objectId = 0, bool shadow=false);
    ~Connection ();

    /** Get the SessionHandler for channel. Create if it does not
already exist */
@@ -132,8 +132,6 @@

    /** True if this is a shadow connection in a cluster. */
    bool isShadow() { return shadow; }
-    /** Called by cluster to mark shadow connections */
-    void setShadow() { shadow = true; }

    // Used by cluster to update connection status
    sys::AggregateOutput& getOutputTasks() { return outputTasks; }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Feb  5 23:02:45 2010
@@ -509,10 +509,8 @@
            assert(cp);
        }
        else {              // New remote connection, create a shadow.
-            std::ostringstream mgmtId;
            unsigned int ssf = (announce && announce->hasSsf()) ?
announce->getSsf() : 0;
-            mgmtId << id;
-            cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf);
+            cp = new Connection(*this, shadowOut,
announce->getManagementId(), id, ssf);
        }
        connections.insert(ConnectionMap::value_type(id, cp));
    }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Feb  5 23:02:45 2010
@@ -23,6 +23,7 @@
 #include "Cluster.h"
 #include "UpdateReceiver.h"

+#include "qpid/assert.h"
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/broker/TxBuffer.h"
@@ -74,28 +75,30 @@


 // Shadow connection
-Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& logId,
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
+                       const std::string& mgmtId,
                       const ConnectionId& id, unsigned int ssf)
    : cluster(c), self(id), catchUp(false), output(*this, out),
-      connectionCtor(&output, cluster.getBroker(), shadowPrefix+logId, ssf),
+      connectionCtor(&output, cluster.getBroker(), mgmtId, ssf,
false, 0, true),
      expectProtocolHeader(false),
      mcastFrameHandler(cluster.getMulticast(), self),
-      consumerNumbering(c.getUpdateReceiver().consumerNumbering)
+      updateIn(c.getUpdateReceiver())
 {}

 // Local connection
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
-                       const std::string& logId, MemberId member,
+                       const std::string& mgmtId, MemberId member,
                       bool isCatchUp, bool isLink, unsigned int ssf
 ) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp),
output(*this, out),
    connectionCtor(&output, cluster.getBroker(),
-                   isCatchUp ? shadowPrefix+logId : logId,
+                   mgmtId,
                   ssf,
                   isLink,
-                   isCatchUp ? ++catchUpId : 0),
+                   isCatchUp ? ++catchUpId : 0,
+                   isCatchUp),  // isCatchUp => shadow
    expectProtocolHeader(isLink),
    mcastFrameHandler(cluster.getMulticast(), self),
-    consumerNumbering(c.getUpdateReceiver().consumerNumbering)
+    updateIn(c.getUpdateReceiver())
 {
    cluster.addLocalConnection(this);
    if (isLocalClient()) {
@@ -104,12 +107,14 @@
        QPID_LOG(info, "new client connection " << *this);
        giveReadCredit(cluster.getSettings().readMax);
        cluster.getMulticast().mcastControl(
-            ClusterConnectionAnnounceBody(ProtocolVersion(),
getSsf()), getId());
+            ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId,
getSsf()), getId());
    }
    else {
-        // Catch-up connections initialized immediately.
+        // Catch-up shadow connections initialized using nextShadow id.
        assert(catchUp);
        QPID_LOG(info, "new catch-up connection " << *this);
+        connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
+        updateIn.nextShadowMgmtId.clear();
        init();
    }
 }
@@ -127,7 +132,6 @@
        connection->setClusterOrderOutput(nullFrameHandler);
        // Disable client throttling, done by active node.
        connection->setClientThrottling(false);
-        connection->setShadow(); // Mark the connection as a shadow.
    }
    if (!isCatchUp())
        connection->setErrorListener(this);
@@ -138,8 +142,9 @@
        output.giveReadCredit(credit);
 }

-void Connection::announce(uint32_t ssf) {
-    assert(ssf == connectionCtor.ssf);
+void Connection::announce(const std::string& mgmtId, uint32_t ssf) {
+    QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
+    QPID_ASSERT(ssf == connectionCtor.ssf);
    init();
 }

@@ -296,13 +301,17 @@
    return sessionState().getSemanticState();
 }

+void Connection::shadowPrepare(const std::string& mgmtId) {
+    updateIn.nextShadowMgmtId = mgmtId;
+}
+
 void Connection::consumerState(const string& name, bool blocked, bool
notifyEnabled, const SequenceNumber& position)
 {
    broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
    c.position = position;
    c.setBlocked(blocked);
    if (notifyEnabled) c.enableNotify(); else c.disableNotify();
-    consumerNumbering.add(c.shared_from_this());
+    updateIn.consumerNumbering.add(c.shared_from_this());
 }


@@ -337,10 +346,15 @@
    OutputTask* task = &session->getSemanticState().find(name);
    connection->getOutputTasks().addOutputTask(task);
 }
-
-void Connection::shadowReady(uint64_t memberId, uint64_t
connectionId, const string& username, const string& fragment, uint32_t
sendMax) {
+
+void Connection::shadowReady(
+    uint64_t memberId, uint64_t connectionId, const string& mgmtId,
+    const string& username, const string& fragment, uint32_t sendMax)
+{
+    QPID_ASSERT(mgmtId == getBrokerConnection().getMgmtId());
    ConnectionId shadowId = ConnectionId(memberId, connectionId);
-    QPID_LOG(debug, cluster << " catch-up connection " << *this << "
becomes shadow " << shadowId);
+    QPID_LOG(debug, cluster << " catch-up connection " << *this
+             << " becomes shadow " << shadowId);
    self = shadowId;
    connection->setUserId(username);
    // OK to use decoder here because cluster is stalled for update.
@@ -355,7 +369,7 @@
 {
    QPID_LOG(debug, cluster << " incoming update complete on
connection " << *this);
    cluster.updateInDone(ClusterMap(joiners, members, frameSeq, configSeq));
-    consumerNumbering.clear();
+    updateIn.consumerNumbering.clear();
    self.second = 0;        // Mark this as completed update connection.
 }

@@ -503,9 +517,9 @@
 }

 void Connection::addQueueListener(const std::string& q, uint32_t listener) {
-    if (listener >= consumerNumbering.size())
+    if (listener >= updateIn.consumerNumbering.size())
        throw Exception(QPID_MSG("Invalid listener ID: " << listener));
-    findQueue(q)->getListeners().addListener(consumerNumbering[listener]);
+    findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]);
 }

 void Connection::managementSchema(const std::string& data) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Feb  5 23:02:45 2010
@@ -65,10 +65,10 @@
  public:

    /** Local connection. */
-    Connection(Cluster&, sys::ConnectionOutputHandler& out, const
std::string& logId, MemberId, bool catchUp, bool isLink,
+    Connection(Cluster&, sys::ConnectionOutputHandler& out, const
std::string& mgmtId, MemberId, bool catchUp, bool isLink,
               unsigned int ssf);
    /** Shadow connection. */
-    Connection(Cluster&, sys::ConnectionOutputHandler& out, const
std::string& logId, const ConnectionId& id,
+    Connection(Cluster&, sys::ConnectionOutputHandler& out, const
std::string& mgmtId, const ConnectionId& id,
               unsigned int ssf);
    ~Connection();

@@ -109,6 +109,8 @@
    // ==== Used in catch-up mode to build initial state.
    //
    // State update methods.
+    void shadowPrepare(const std::string&);
+
    void sessionState(const framing::SequenceNumber& replayStart,
                      const framing::SequenceNumber& sendCommandPoint,
                      const framing::SequenceSet& sentIncomplete,
@@ -119,7 +121,12 @@

    void outputTask(uint16_t channel, const std::string& name);

-    void shadowReady(uint64_t memberId, uint64_t connectionId, const
std::string& username, const std::string& fragment, uint32_t sendMax);
+    void shadowReady(uint64_t memberId,
+                     uint64_t connectionId,
+                     const std::string& managementId,
+                     const std::string& username,
+                     const std::string& fragment,
+                     uint32_t sendMax);

    void membership(const framing::FieldTable&, const framing::FieldTable&,
                    const framing::SequenceNumber& frameSeq,
@@ -156,7 +163,7 @@
    void exchange(const std::string& encoded);

    void giveReadCredit(int credit);
-    void announce(uint32_t ssf);
+    void announce(const std::string& mgmtId, uint32_t ssf);
    void abort();
    void deliverClose();

@@ -182,6 +189,7 @@
        unsigned int ssf;
        bool isLink;
        uint64_t objectId;
+        bool shadow;

        ConnectionCtor(
            sys::ConnectionOutputHandler* out_,
@@ -189,12 +197,15 @@
            const std::string& mgmtId_,
            unsigned int ssf_,
            bool isLink_=false,
-            uint64_t objectId_=0
-        ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_),
isLink(isLink_), objectId(objectId_) {}
+            uint64_t objectId_=0,
+            bool shadow_=false
+        ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_),
+            isLink(isLink_), objectId(objectId_), shadow(shadow_)
+        {}

        std::auto_ptr<broker::Connection> construct() {
            return std::auto_ptr<broker::Connection>(
-                new broker::Connection(out, broker, mgmtId, ssf,
isLink, objectId));
+                new broker::Connection(out, broker, mgmtId, ssf,
isLink, objectId, shadow));
        }
    };

@@ -225,7 +236,7 @@
    boost::shared_ptr<broker::TxBuffer> txBuffer;
    bool expectProtocolHeader;
    McastFrameHandler mcastFrameHandler;
-    UpdateReceiver::ConsumerNumbering& consumerNumbering;
+    UpdateReceiver& updateIn;

    static qpid::sys::AtomicValue<uint64_t> catchUpId;


Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Fri Feb  5
23:02:45 2010
@@ -57,6 +57,7 @@
 #include <boost/bind.hpp>
 #include <boost/cast.hpp>
 #include <algorithm>
+#include <sstream>

 namespace qpid {
 namespace cluster {
@@ -148,7 +149,7 @@
    ClusterConnectionProxy(session).expiryId(expiry.getId());

    updateManagementAgent();
-
+
    ClusterConnectionMembershipBody membership;
    map.toMethodBody(membership);
    AMQFrame frame(membership);
@@ -328,6 +329,14 @@

 void UpdateClient::updateConnection(const
boost::intrusive_ptr<Connection>& updateConnection) {
    QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
+
+    // Send the management ID first on the main connection.
+    std::string mgmtId = updateConnection->getBrokerConnection().getMgmtId();
+    ClusterConnectionProxy(session).shadowPrepare(mgmtId);
+    // Make sure its received before opening shadow connection
+    session.sync();
+
+    // Open shadow connection and update it.
    shadowConnection = catchUpConnection();

    broker::Connection& bc = updateConnection->getBrokerConnection();
@@ -341,6 +350,7 @@
    ClusterConnectionProxy(shadowConnection).shadowReady(
        updateConnection->getId().getMember(),
        updateConnection->getId().getNumber(),
+        bc.getMgmtId(),
        bc.getUserId(),
        string(fragment.first, fragment.second),
        updateConnection->getOutput().getSendMax()

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h Fri Feb  5
23:02:45 2010
@@ -36,6 +36,9 @@
    /** Numbering used to identify Queue listeners as consumers */
    typedef Numbering<boost::shared_ptr<broker::SemanticState::ConsumerImpl>
> ConsumerNumbering;
    ConsumerNumbering consumerNumbering;
+
+    /** Management-id for the next shadow connection */
+    std::string nextShadowMgmtId;
 };
 }} // namespace qpid::cluster


Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Fri Feb  5 23:02:45 2010
@@ -117,6 +117,7 @@

    <!-- Announce a new connection -->
    <control name="announce" code="0x1">
+      <field name="management-id" type="str16"/>
      <!-- Security Strength Factor (ssf): if the transport provides
      encryption (e.g. ssl), ssf is the bit length of the key.  Zero if no
      encryption provided. -->
@@ -135,13 +136,18 @@
    <control name="abort" code="0x4"/>

    <!-- Update controls. Sent to a new broker in joining mode.
-        A connection is updateed as followed:
-        - open as a normal connection.
+        A connection is updated as followed:
+        - send the shadow's management ID in shadow-perpare on the
update connection
+        - open the shadow as a normal connection.
        - attach sessions, create consumers, set flow with normal AMQP
cokmmands.
        - send /reset additional session state with controls below.
        - send shadow-ready to mark end of shadow update.
        - send membership when entire update is complete.
    -->
+    <!-- Prepare to send a shadow connection with the given ID. -->
+    <control name="shadow-prepare" code="0x0F">
+      <field name="management-id" type="str16"/>
+    </control>

    <!-- Consumer state that cannot be set by standard AMQP controls. -->
    <control name="consumer-state" code="0x10">
@@ -202,6 +208,7 @@
    <control name="shadow-ready" code="0x20" label="End of shadow
connection update.">
      <field name="member-id" type="uint64"/>
      <field name="connection-id" type="uint64"/>
+      <field name="management-id" type="str16"/>
      <field name="user-name" type="str8"/>
      <field name="fragment" type="str32"/>
      <field name="send-max" type="uint32"/>

Modified: qpid/trunk/qpid/python/commands/qpid-cluster
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-cluster?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-cluster (original)
+++ qpid/trunk/qpid/python/commands/qpid-cluster Fri Feb  5 23:02:45 2010
@@ -193,7 +193,6 @@
        self.qmf.delBroker(self.broker)
        self.broker = None
        self.brokers = []
-        pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")

        idx = 0
        for host in hostList:
@@ -209,7 +208,7 @@
                print "Clients on Member: ID=%s:" % displayList[idx]
            connList = self.qmf.getObjects(_class="connection",
_package="org.apache.qpid.broker", _broker=broker)
            for conn in connList:
-                if pattern.match(conn.address):
+                if not conn.shadow:
                    if self.config._numeric or self.config._delConn:
                        a = conn.address
                    else:

Modified: qpid/trunk/qpid/python/commands/qpid-stat
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-stat?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-stat (original)
+++ qpid/trunk/qpid/python/commands/qpid-stat Fri Feb  5 23:02:45 2010
@@ -34,7 +34,6 @@
 _limit = 50
 _increasing = False
 _sortcol = None
-pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")

 def Usage ():
    print "Usage:  qpid-stat [OPTIONS] [broker-addr]"
@@ -108,7 +107,7 @@

        list = qmf.getObjects(_class="connection", _package=package,
_agent=self.brokerAgent)
        for conn in list:
-            if pattern.match(conn.address):
+            if not conn.shadow:
                self.connections[conn.getObjectId()] = conn

        list = qmf.getObjects(_class="session", _package=package,
_agent=self.brokerAgent)

Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=907123&r1=907122&r2=907123&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Fri Feb  5 23:02:45 2010
@@ -236,6 +236,7 @@
    <property name="remoteProcessName"  type="sstr"   access="RO"
optional="y" desc="Name of executable running as remote client"/>
    <property name="remotePid"          type="uint32" access="RO"
optional="y" desc="Process ID of remote client"/>
    <property name="remoteParentPid"    type="uint32" access="RO"
optional="y" desc="Parent Process ID of remote client"/>
+    <property name="shadow"             type="bool"   access="RO"
desc="True for shadow connections"/>
    <statistic name="closing"          type="bool" desc="This client
is closing by management request"/>
    <statistic name="framesFromClient" type="count64"/>
    <statistic name="framesToClient"   type="count64"/>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org