You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/07/01 15:47:05 UTC

svn commit: r790163 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/ src/tests/ xml/

Author: aconway
Date: Wed Jul  1 13:47:04 2009
New Revision: 790163

URL: http://svn.apache.org/viewvc?rev=790163&view=rev
Log:
Fix members joining cluster while cluster is handling client errors.

Previously cluster members could abort if a new member joins while
existing members are handling a client error.

Now if an update offer arrives while an error is in progress, the
offering broker retracts the offer and the newcomer must try again.

Added:
    qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
    qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Wed Jul  1 13:47:04 2009
@@ -56,6 +56,8 @@
   qpid/cluster/Dispatchable.h			\
   qpid/cluster/UpdateClient.cpp			\
   qpid/cluster/UpdateClient.h			\
+  qpid/cluster/RetractClient.cpp		\
+  qpid/cluster/RetractClient.h			\
   qpid/cluster/ErrorCheck.cpp			\
   qpid/cluster/ErrorCheck.h			\
   qpid/cluster/Event.cpp			\

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jul  1 13:47:04 2009
@@ -88,6 +88,7 @@
 #include "ClusterSettings.h"
 #include "Connection.h"
 #include "UpdateClient.h"
+#include "RetractClient.h"
 #include "FailoverExchange.h"
 #include "UpdateExchange.h"
 
@@ -104,6 +105,7 @@
 #include "qpid/framing/ClusterConfigChangeBody.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ClusterConnectionAbortBody.h"
+#include "qpid/framing/ClusterRetractOfferBody.h"
 #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
 #include "qpid/framing/ClusterReadyBody.h"
 #include "qpid/framing/ClusterShutdownBody.h"
@@ -152,6 +154,7 @@
     void ready(const std::string& url) { cluster.ready(member, url, l); }
     void configChange(const std::string& current) { cluster.configChange(member, current, l); }
     void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
+    void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
     void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
     void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
 
@@ -186,6 +189,7 @@
     state(INIT),
     lastSize(0),
     lastBroker(false),
+    updateRetracted(false),
     error(*this)
 {
     mAgent = broker.getManagementAgent();
@@ -325,6 +329,12 @@
     deliverFrameQueue.push(e);
 }
 
+const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
+    return  (body && body->getMethod() &&
+             body->getMethod()->isA<ClusterUpdateOfferBody>()) ?
+        static_cast<const ClusterUpdateOfferBody*>(body) : 0;
+}
+ 
 // Handler for deliverEventQueue.
 // This thread decodes frames from events.
 void Cluster::deliveredEvent(const Event& e) {
@@ -334,8 +344,7 @@
         EventFrame ef(e, e.getFrame());
         // Stop the deliverEventQueue on update offers.
         // This preserves the connection decoder fragments for an update.
-        ClusterUpdateOfferBody* offer = dynamic_cast<ClusterUpdateOfferBody*>(ef.frame.getBody());
-        if (offer)
+        if (castUpdateOffer(ef.frame.getBody()))
             deliverEventQueue.stop();
         deliverFrame(ef);
     }
@@ -357,20 +366,37 @@
         QPID_LOG(trace, *this << " DROP: " << e);
 }
 
-void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
+void Cluster::flagError(
+    Connection& connection, ErrorCheck::ErrorType type, const std::string& msg)
+{
     Mutex::ScopedLock l(lock);
-    if (settings.checkErrors)
-        error.error(connection, type, map.getFrameSeq(), map.getMembers());
+    if (connection.isCatchUp()) {
+        QPID_LOG(critical, *this << " error on update connection " << connection
+                 << ": " << msg);
+        leave(l);
+    }
+    else if (settings.checkErrors)
+        error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg);
 }
 
 LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");)
 
 // Handler for deliverFrameQueue.
 // This thread executes the main logic.
-void Cluster::deliveredFrame(const EventFrame& e) {
+    void Cluster::deliveredFrame(const EventFrame& efConst) {
     LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
     LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
     Mutex::ScopedLock l(lock);
+    EventFrame e(efConst);
+    const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody());
+    if (offer && error.isUnresolved()) {
+        // We can't honour an update offer that is delivered while an
+        // error is in progress so replace it with a retractOffer and re-start
+        // the event queue.
+        e.frame = AMQFrame(
+            ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee()));
+        deliverEventQueue.start(); 
+    }
     // Process each frame through the error checker.
     if (settings.checkErrors) {
         error.delivered(e);
@@ -382,7 +408,6 @@
     }
 }
 
-LATENCY_TRACK(sys::LatencyStatistic processLatency("Process");)
 
 void Cluster::processFrame(const EventFrame& e, Lock& l) {
     if (e.isCluster()) {
@@ -562,6 +587,14 @@
     }
 }
 
+// Go back to normal processing after an offer that did not result in an update.  
+void Cluster::cancelOffer(const MemberId& updatee, Lock& l) {    
+    QPID_LOG(info, *this << " cancelled offer to " << updatee);
+    deliverEventQueue.start(); // Go back to normal processing
+    setReady(l);
+    makeOffer(map.firstJoiner(), l); // Maybe make another offer.
+}
+ 
 void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
     // NOTE: deliverEventQueue has been stopped at the update offer by
     // deliveredEvent in case an update is required.
@@ -572,12 +605,8 @@
         assert(state == OFFER);
         if (url)               // My offer was first.
             updateStart(updatee, *url, l);
-        else {                  // Another offer was first.
-            deliverEventQueue.start(); // Don't need to update
-            setReady(l);
-            QPID_LOG(info, *this << " cancelled update offer to " << updatee);
-            makeOffer(map.firstJoiner(), l); // Maybe make another offer.
-        }
+        else                   // Another offer was first.
+            cancelOffer(updatee, l);
     }
     else if (updatee == self && url) {
         assert(state == JOINER);
@@ -587,7 +616,34 @@
         checkUpdateIn(l);
     }
     else
-        deliverEventQueue.start(); // Don't need to update
+        deliverEventQueue.start(); // Not involved in update.
+}
+
+static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) {
+    client::ConnectionSettings cs;
+    cs.username = settings.username;
+    cs.password = settings.password;
+    cs.mechanism = settings.mechanism;
+    return cs;
+}
+
+void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) {
+    // An offer was received while handling an error, and converted to a retract.
+    if (state == LEFT) return;
+    MemberId updatee(updateeInt);
+    boost::optional<Url> url = map.updateOffer(updater, updatee);
+    if (updater == self) {
+        assert(state == OFFER);
+        if (url)  {             // My offer was first.
+            QPID_LOG(info, *this << " retracted offer to " << updatee);
+            if (updateThread.id())
+                updateThread.join(); // Join the previous updateThread to avoid leaks.
+            updateThread = Thread(new RetractClient(*url, connectionSettings(settings)));
+        }
+        cancelOffer(updatee, l);
+    }
+    else
+        deliverEventQueue.start(); // Not involved in update.
 }
 
 void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
@@ -598,15 +654,12 @@
     QPID_LOG(info, *this << " sending update to " << updatee << " at " << url);
     if (updateThread.id())
         updateThread.join(); // Join the previous updateThread to avoid leaks.
-    client::ConnectionSettings cs;
-    cs.username = settings.username;
-    cs.password = settings.password;
-    cs.mechanism = settings.mechanism;
     updateThread = Thread(
-        new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, getConnections(l), decoder,
+        new UpdateClient(self, updatee, url, broker, map, *expiryPolicy,
+                         getConnections(l), decoder,
                          boost::bind(&Cluster::updateOutDone, this),
                          boost::bind(&Cluster::updateOutError, this, _1),
-                         cs));
+                         connectionSettings(settings)));
 }
 
 // Called in update thread.
@@ -616,8 +669,15 @@
     checkUpdateIn(l);
 }
 
+void Cluster::updateInRetracted() {
+    Lock l(lock);
+    updateRetracted = true;
+    checkUpdateIn(l);
+}
+
 void Cluster::checkUpdateIn(Lock&) {
-    if (state == UPDATEE && updatedMap) {
+    if (state != UPDATEE) return; // Wait till we reach the stall point.
+    if (updatedMap) { // We're up to date
         map = *updatedMap;
         mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
         state = CATCHUP;
@@ -625,6 +685,13 @@
         QPID_LOG(info, *this << " received update, starting catch-up");
         deliverEventQueue.start();
     }
+    else if (updateRetracted) { // Update was retracted, request another update
+        updateRetracted = false;
+        state = JOINER;
+        QPID_LOG(info, *this << " re-try joining after retracted update");
+        mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
+        deliverEventQueue.start();
+    }
 }
 
 void Cluster::updateOutDone() {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Jul  1 13:47:04 2009
@@ -94,6 +94,7 @@
 
     // Update completed - called in update thread
     void updateInDone(const ClusterMap&);
+    void updateInRetracted();
 
     MemberId getId() const;
     broker::Broker& getBroker() const;
@@ -106,7 +107,7 @@
     void deliverFrame(const EventFrame&);
 
     // Called in deliverFrame thread to indicate an error from the broker.
-    void flagError(Connection&, ErrorCheck::ErrorType);
+    void flagError(Connection&, ErrorCheck::ErrorType, const std::string& msg);
     void connectionError();
 
     // Called only during update by Connection::shadowReady
@@ -141,6 +142,7 @@
     // Cluster controls implement XML methods from cluster.xml.
     void updateRequest(const MemberId&, const std::string&, Lock&);
     void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
+    void retractOffer(const MemberId& updater, uint64_t updatee, Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&, const std::string& current, Lock& l);
     void messageExpired(const MemberId&, uint64_t, Lock& l);
@@ -157,6 +159,7 @@
     void memberUpdate(Lock&);
     void setClusterId(const framing::Uuid&, Lock&);
     void erase(const ConnectionId&, Lock&);       
+    void cancelOffer(const MemberId&, Lock&);
 
     // == Called in CPG dispatch thread
     void deliver( // CPG deliver callback. 
@@ -251,6 +254,7 @@
     bool lastBroker;
     sys::Thread updateThread;
     boost::optional<ClusterMap> updatedMap;
+    bool updateRetracted;
     ErrorCheck error;
 
   friend std::ostream& operator<<(std::ostream&, const Cluster&);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jul  1 13:47:04 2009
@@ -309,6 +309,12 @@
     self.second = 0;        // Mark this as completed update connection.
 }
 
+void Connection::retractOffer() {
+    QPID_LOG(debug, cluster << " incoming update retracted on connection " << *this);
+    cluster.updateInRetracted();
+    self.second = 0;        // Mark this as completed update connection.
+}
+
 bool Connection::isLocal() const {
     return self.first == cluster.getId() && self.second;
 }
@@ -435,13 +441,13 @@
     QPID_LOG(debug, cluster << " decoded queue " << q->getName());    
 }
 
-void Connection::sessionError(uint16_t , const std::string& ) {
-    cluster.flagError(*this, ERROR_TYPE_SESSION);
+void Connection::sessionError(uint16_t , const std::string& msg) {
+    cluster.flagError(*this, ERROR_TYPE_SESSION, msg);
     
 }
 
-void Connection::connectionError(const std::string& ) {
-    cluster.flagError(*this, ERROR_TYPE_CONNECTION);
+void Connection::connectionError(const std::string& msg) {
+    cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg);
 }
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Jul  1 13:47:04 2009
@@ -121,6 +121,8 @@
 
     void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
 
+    void retractOffer();
+
     void deliveryRecord(const std::string& queue,
                         const framing::SequenceNumber& position,
                         const std::string& tag,

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Wed Jul  1 13:47:04 2009
@@ -44,7 +44,8 @@
     return o;
 }
 
-void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms)
+void ErrorCheck::error(
+    Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms, const std::string& msg)
 {
     // Detected a local error, inform cluster and set error state.
     assert(t != ERROR_TYPE_NONE); // Must be an error.
@@ -53,8 +54,10 @@
     unresolved = ms;
     frameSeq = seq;
     connection = &c;
-    QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection")
-             << " error " << frameSeq << " unresolved: " << unresolved);
+    QPID_LOG(error, cluster
+             << (type == ERROR_TYPE_SESSION ? " channel" : " connection")
+             << " error " << frameSeq << " on " << c << ": " << msg
+             << " (unresolved: " << unresolved << ")");
     mcast.mcastControl(
         ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember());
 }
@@ -67,11 +70,13 @@
                 e.frame.getMethod());
         if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
             if (errorCheck->getType() < type) { // my error is worse than his
-                QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId());
+                QPID_LOG(critical, cluster << " error " << frameSeq
+                         << " did not occur on " << e.getMemberId());
                 throw Exception("Aborted by local failure that did not occur on all replicas");
             }
             else {              // his error is worse/same as mine.
-                QPID_LOG(debug, cluster << " Error " << frameSeq << " outcome agrees with " << e.getMemberId());
+                QPID_LOG(debug, cluster << " error " << frameSeq
+                         << " outcome agrees with " << e.getMemberId());
                 unresolved.erase(e.getMemberId());
                 checkResolved();
             }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h Wed Jul  1 13:47:04 2009
@@ -53,7 +53,8 @@
     ErrorCheck(Cluster&);
 
     /** A local error has occured */
-    void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&);
+    void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&,
+               const std::string& msg);
 
     /** Called when a frame is delivered */
     void delivered(const EventFrame&);

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.cpp?rev=790163&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.cpp Wed Jul  1 13:47:04 2009
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "RetractClient.h"
+#include "UpdateClient.h"
+#include "qpid/framing/ClusterConnectionRetractOfferBody.h"
+#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+namespace {
+
+struct AutoClose {
+    client::Connection& connection;
+    AutoClose(client::Connection& c) : connection(c) {}
+    ~AutoClose() { connection.close(); }
+};
+}
+
+RetractClient::RetractClient(const Url& u, const client::ConnectionSettings& cs)
+    : url(u), connectionSettings(cs)
+{}
+
+RetractClient::~RetractClient() { delete this; }
+
+
+void RetractClient::run() {
+    try {
+        client::Connection c = UpdateClient::catchUpConnection();
+        c.open(url, connectionSettings);
+        AutoClose ac(c);
+        AMQFrame retract((ClusterConnectionRetractOfferBody()));
+        client::ConnectionAccess::getImpl(c)->handle(retract);
+    } catch (const std::exception& e) {
+        QPID_LOG(error, " while retracting retract to " << url << ": " << e.what()); 
+    }
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.h?rev=790163&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.h Wed Jul  1 13:47:04 2009
@@ -0,0 +1,49 @@
+#ifndef QPID_CLUSTER_RETRACTCLIENT_H
+#define QPID_CLUSTER_RETRACTCLIENT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/sys/Runnable.h"
+
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A client that retracts an offer to a remote broker using AMQP. @see UpdateClient
+ */
+class RetractClient : public sys::Runnable {
+  public:
+
+    RetractClient(const Url&, const client::ConnectionSettings&);
+    ~RetractClient();
+    void run();                 // Will delete this when finished.
+
+  private:
+    Url url;
+    client::ConnectionSettings connectionSettings;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_RETRACTCLIENT_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed Jul  1 13:47:04 2009
@@ -79,7 +79,7 @@
 };
 
 // Create a connection with special version that marks it as a catch-up connection.
-client::Connection catchUpConnection() {
+client::Connection UpdateClient::catchUpConnection() {
     client::Connection c;
     client::ConnectionAccess::setVersion(c, ProtocolVersion(0x80 , 0x80 + 10));
     return c;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Wed Jul  1 13:47:04 2009
@@ -65,6 +65,7 @@
 class UpdateClient : public sys::Runnable {
   public:
     static const std::string UPDATE; // Name for special update queue and exchange.
+    static client::Connection catchUpConnection();
     
     UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
                  broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry,

Modified: qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp Wed Jul  1 13:47:04 2009
@@ -23,7 +23,7 @@
 #include "TestOptions.h"
 #include "qpid/client/SubscriptionManager.h"
 #include "qpid/client/Connection.h"
-#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
 #include "qpid/sys/Time.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Runnable.h"
@@ -63,7 +63,7 @@
         try {
             opts.open(connection);
             if (!opts.quiet) cout << "Opened connection." << endl;
-            Session s = connection.newSession();
+            AsyncSession s = connection.newSession();
             string qname(Uuid(true).str());
             s.queueDeclare(arg::queue=qname,arg::autoDelete=true,arg::exclusive=true);
             s.messageTransfer(arg::content=Message("hello", qname));
@@ -71,6 +71,7 @@
             SubscriptionManager subs(s);
             subs.get(qname);
             if (!opts.quiet) cout << "Received message." << endl;
+            s.sync();
             s.close();
             connection.close();
             Mutex::ScopedLock l(lock);

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Wed Jul  1 13:47:04 2009
@@ -27,16 +27,24 @@
   <class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
     <doc>Qpid extension class to allow clustered brokers to communicate.</doc>
 
-    <control name="update-request" code="0x1" label="URL for a member.">
+    <!-- New joiner requests an update to url. -->
+    <control name="update-request" code="0x1">
       <field name="url" type="str16"/>
     </control>
 
-    <control name = "update-offer" code="0x2" label="Member offering to be updater for updatee.">
+    <!-- Sender offers an update to a new joiner. -->
+    <control name = "update-offer" code="0x2">
       <field name="updatee" type="uint64"/>
       <field name="cluster-id" type="uuid"/>
     </control>
 
-    <control name="ready" code="0x10" label="New member is ready.">
+    <!-- Sender retracts an offer to a new joiner. -->
+    <control name = "retract-offer" code="0x3">
+      <field name="updatee" type="uint64"/>
+    </control>
+
+    <!-- New member or updater is ready as an active member. -->
+    <control name="ready" code="0x10">
       <field name="url" type="str16"/>
     </control>
 
@@ -91,7 +99,7 @@
 	 - attach sessions, create consumers, set flow with normal AMQP cokmmands.
 	 - send /reset additional session state with controls below.
 	 - send shadow-ready to mark end of shadow update.
-	 - send update-complete when entire update is complete.
+	 - send membership when entire update is complete.
     -->
     
     <!-- Consumer state that cannot be set by standard AMQP controls. -->
@@ -166,6 +174,9 @@
       <field name="frame-seq" type="uint64"/>	 <!-- frame sequence number -->
     </control>
 
+    <!-- Updater cannot fulfill an update offer. -->
+    <control name = "retract-offer" code="0x22"/>
+
     <!-- Set the position of a replicated queue. -->
     <control name="queue-position" code="0x30">
       <field name="queue" type="str8"/>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org