You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/07/01 15:47:05 UTC
svn commit: r790163 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/
src/tests/ xml/
Author: aconway
Date: Wed Jul 1 13:47:04 2009
New Revision: 790163
URL: http://svn.apache.org/viewvc?rev=790163&view=rev
Log:
Fix members joining cluster while cluster is handling client errors.
Previously cluster members could abort if a new member joins while
existing members are handling a client error.
Now if an update offer arrives while an error is in progress, the
offering broker retracts the offer and the newcomer must try again.
Added:
qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.cpp (with props)
qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.h (with props)
Modified:
qpid/trunk/qpid/cpp/src/cluster.mk
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp
qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Wed Jul 1 13:47:04 2009
@@ -56,6 +56,8 @@
qpid/cluster/Dispatchable.h \
qpid/cluster/UpdateClient.cpp \
qpid/cluster/UpdateClient.h \
+ qpid/cluster/RetractClient.cpp \
+ qpid/cluster/RetractClient.h \
qpid/cluster/ErrorCheck.cpp \
qpid/cluster/ErrorCheck.h \
qpid/cluster/Event.cpp \
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jul 1 13:47:04 2009
@@ -88,6 +88,7 @@
#include "ClusterSettings.h"
#include "Connection.h"
#include "UpdateClient.h"
+#include "RetractClient.h"
#include "FailoverExchange.h"
#include "UpdateExchange.h"
@@ -104,6 +105,7 @@
#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAbortBody.h"
+#include "qpid/framing/ClusterRetractOfferBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
@@ -152,6 +154,7 @@
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
+ void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
@@ -186,6 +189,7 @@
state(INIT),
lastSize(0),
lastBroker(false),
+ updateRetracted(false),
error(*this)
{
mAgent = broker.getManagementAgent();
@@ -325,6 +329,12 @@
deliverFrameQueue.push(e);
}
+const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
+ return (body && body->getMethod() &&
+ body->getMethod()->isA<ClusterUpdateOfferBody>()) ?
+ static_cast<const ClusterUpdateOfferBody*>(body) : 0;
+}
+
// Handler for deliverEventQueue.
// This thread decodes frames from events.
void Cluster::deliveredEvent(const Event& e) {
@@ -334,8 +344,7 @@
EventFrame ef(e, e.getFrame());
// Stop the deliverEventQueue on update offers.
// This preserves the connection decoder fragments for an update.
- ClusterUpdateOfferBody* offer = dynamic_cast<ClusterUpdateOfferBody*>(ef.frame.getBody());
- if (offer)
+ if (castUpdateOffer(ef.frame.getBody()))
deliverEventQueue.stop();
deliverFrame(ef);
}
@@ -357,20 +366,37 @@
QPID_LOG(trace, *this << " DROP: " << e);
}
-void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
+void Cluster::flagError(
+ Connection& connection, ErrorCheck::ErrorType type, const std::string& msg)
+{
Mutex::ScopedLock l(lock);
- if (settings.checkErrors)
- error.error(connection, type, map.getFrameSeq(), map.getMembers());
+ if (connection.isCatchUp()) {
+ QPID_LOG(critical, *this << " error on update connection " << connection
+ << ": " << msg);
+ leave(l);
+ }
+ else if (settings.checkErrors)
+ error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg);
}
LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");)
// Handler for deliverFrameQueue.
// This thread executes the main logic.
-void Cluster::deliveredFrame(const EventFrame& e) {
+ void Cluster::deliveredFrame(const EventFrame& efConst) {
LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
Mutex::ScopedLock l(lock);
+ EventFrame e(efConst);
+ const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody());
+ if (offer && error.isUnresolved()) {
+ // We can't honour an update offer that is delivered while an
+ // error is in progress so replace it with a retractOffer and re-start
+ // the event queue.
+ e.frame = AMQFrame(
+ ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee()));
+ deliverEventQueue.start();
+ }
// Process each frame through the error checker.
if (settings.checkErrors) {
error.delivered(e);
@@ -382,7 +408,6 @@
}
}
-LATENCY_TRACK(sys::LatencyStatistic processLatency("Process");)
void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
@@ -562,6 +587,14 @@
}
}
+// Go back to normal processing after an offer that did not result in an update.
+void Cluster::cancelOffer(const MemberId& updatee, Lock& l) {
+ QPID_LOG(info, *this << " cancelled offer to " << updatee);
+ deliverEventQueue.start(); // Go back to normal processing
+ setReady(l);
+ makeOffer(map.firstJoiner(), l); // Maybe make another offer.
+}
+
void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
// NOTE: deliverEventQueue has been stopped at the update offer by
// deliveredEvent in case an update is required.
@@ -572,12 +605,8 @@
assert(state == OFFER);
if (url) // My offer was first.
updateStart(updatee, *url, l);
- else { // Another offer was first.
- deliverEventQueue.start(); // Don't need to update
- setReady(l);
- QPID_LOG(info, *this << " cancelled update offer to " << updatee);
- makeOffer(map.firstJoiner(), l); // Maybe make another offer.
- }
+ else // Another offer was first.
+ cancelOffer(updatee, l);
}
else if (updatee == self && url) {
assert(state == JOINER);
@@ -587,7 +616,34 @@
checkUpdateIn(l);
}
else
- deliverEventQueue.start(); // Don't need to update
+ deliverEventQueue.start(); // Not involved in update.
+}
+
+static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) {
+ client::ConnectionSettings cs;
+ cs.username = settings.username;
+ cs.password = settings.password;
+ cs.mechanism = settings.mechanism;
+ return cs;
+}
+
+void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) {
+ // An offer was received while handling an error, and converted to a retract.
+ if (state == LEFT) return;
+ MemberId updatee(updateeInt);
+ boost::optional<Url> url = map.updateOffer(updater, updatee);
+ if (updater == self) {
+ assert(state == OFFER);
+ if (url) { // My offer was first.
+ QPID_LOG(info, *this << " retracted offer to " << updatee);
+ if (updateThread.id())
+ updateThread.join(); // Join the previous updateThread to avoid leaks.
+ updateThread = Thread(new RetractClient(*url, connectionSettings(settings)));
+ }
+ cancelOffer(updatee, l);
+ }
+ else
+ deliverEventQueue.start(); // Not involved in update.
}
void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
@@ -598,15 +654,12 @@
QPID_LOG(info, *this << " sending update to " << updatee << " at " << url);
if (updateThread.id())
updateThread.join(); // Join the previous updateThread to avoid leaks.
- client::ConnectionSettings cs;
- cs.username = settings.username;
- cs.password = settings.password;
- cs.mechanism = settings.mechanism;
updateThread = Thread(
- new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, getConnections(l), decoder,
+ new UpdateClient(self, updatee, url, broker, map, *expiryPolicy,
+ getConnections(l), decoder,
boost::bind(&Cluster::updateOutDone, this),
boost::bind(&Cluster::updateOutError, this, _1),
- cs));
+ connectionSettings(settings)));
}
// Called in update thread.
@@ -616,8 +669,15 @@
checkUpdateIn(l);
}
+void Cluster::updateInRetracted() {
+ Lock l(lock);
+ updateRetracted = true;
+ checkUpdateIn(l);
+}
+
void Cluster::checkUpdateIn(Lock&) {
- if (state == UPDATEE && updatedMap) {
+ if (state != UPDATEE) return; // Wait till we reach the stall point.
+ if (updatedMap) { // We're up to date
map = *updatedMap;
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
@@ -625,6 +685,13 @@
QPID_LOG(info, *this << " received update, starting catch-up");
deliverEventQueue.start();
}
+ else if (updateRetracted) { // Update was retracted, request another update
+ updateRetracted = false;
+ state = JOINER;
+ QPID_LOG(info, *this << " re-try joining after retracted update");
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
+ deliverEventQueue.start();
+ }
}
void Cluster::updateOutDone() {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Jul 1 13:47:04 2009
@@ -94,6 +94,7 @@
// Update completed - called in update thread
void updateInDone(const ClusterMap&);
+ void updateInRetracted();
MemberId getId() const;
broker::Broker& getBroker() const;
@@ -106,7 +107,7 @@
void deliverFrame(const EventFrame&);
// Called in deliverFrame thread to indicate an error from the broker.
- void flagError(Connection&, ErrorCheck::ErrorType);
+ void flagError(Connection&, ErrorCheck::ErrorType, const std::string& msg);
void connectionError();
// Called only during update by Connection::shadowReady
@@ -141,6 +142,7 @@
// Cluster controls implement XML methods from cluster.xml.
void updateRequest(const MemberId&, const std::string&, Lock&);
void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
+ void retractOffer(const MemberId& updater, uint64_t updatee, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& current, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
@@ -157,6 +159,7 @@
void memberUpdate(Lock&);
void setClusterId(const framing::Uuid&, Lock&);
void erase(const ConnectionId&, Lock&);
+ void cancelOffer(const MemberId&, Lock&);
// == Called in CPG dispatch thread
void deliver( // CPG deliver callback.
@@ -251,6 +254,7 @@
bool lastBroker;
sys::Thread updateThread;
boost::optional<ClusterMap> updatedMap;
+ bool updateRetracted;
ErrorCheck error;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jul 1 13:47:04 2009
@@ -309,6 +309,12 @@
self.second = 0; // Mark this as completed update connection.
}
+void Connection::retractOffer() {
+ QPID_LOG(debug, cluster << " incoming update retracted on connection " << *this);
+ cluster.updateInRetracted();
+ self.second = 0; // Mark this as completed update connection.
+}
+
bool Connection::isLocal() const {
return self.first == cluster.getId() && self.second;
}
@@ -435,13 +441,13 @@
QPID_LOG(debug, cluster << " decoded queue " << q->getName());
}
-void Connection::sessionError(uint16_t , const std::string& ) {
- cluster.flagError(*this, ERROR_TYPE_SESSION);
+void Connection::sessionError(uint16_t , const std::string& msg) {
+ cluster.flagError(*this, ERROR_TYPE_SESSION, msg);
}
-void Connection::connectionError(const std::string& ) {
- cluster.flagError(*this, ERROR_TYPE_CONNECTION);
+void Connection::connectionError(const std::string& msg) {
+ cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg);
}
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Jul 1 13:47:04 2009
@@ -121,6 +121,8 @@
void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
+ void retractOffer();
+
void deliveryRecord(const std::string& queue,
const framing::SequenceNumber& position,
const std::string& tag,
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Wed Jul 1 13:47:04 2009
@@ -44,7 +44,8 @@
return o;
}
-void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms)
+void ErrorCheck::error(
+ Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms, const std::string& msg)
{
// Detected a local error, inform cluster and set error state.
assert(t != ERROR_TYPE_NONE); // Must be an error.
@@ -53,8 +54,10 @@
unresolved = ms;
frameSeq = seq;
connection = &c;
- QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection")
- << " error " << frameSeq << " unresolved: " << unresolved);
+ QPID_LOG(error, cluster
+ << (type == ERROR_TYPE_SESSION ? " channel" : " connection")
+ << " error " << frameSeq << " on " << c << ": " << msg
+ << " (unresolved: " << unresolved << ")");
mcast.mcastControl(
ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember());
}
@@ -67,11 +70,13 @@
e.frame.getMethod());
if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
if (errorCheck->getType() < type) { // my error is worse than his
- QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId());
+ QPID_LOG(critical, cluster << " error " << frameSeq
+ << " did not occur on " << e.getMemberId());
throw Exception("Aborted by local failure that did not occur on all replicas");
}
else { // his error is worse/same as mine.
- QPID_LOG(debug, cluster << " Error " << frameSeq << " outcome agrees with " << e.getMemberId());
+ QPID_LOG(debug, cluster << " error " << frameSeq
+ << " outcome agrees with " << e.getMemberId());
unresolved.erase(e.getMemberId());
checkResolved();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h Wed Jul 1 13:47:04 2009
@@ -53,7 +53,8 @@
ErrorCheck(Cluster&);
/** A local error has occured */
- void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&);
+ void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&,
+ const std::string& msg);
/** Called when a frame is delivered */
void delivered(const EventFrame&);
Added: qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.cpp?rev=790163&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.cpp Wed Jul 1 13:47:04 2009
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "RetractClient.h"
+#include "UpdateClient.h"
+#include "qpid/framing/ClusterConnectionRetractOfferBody.h"
+#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+namespace {
+
+struct AutoClose {
+ client::Connection& connection;
+ AutoClose(client::Connection& c) : connection(c) {}
+ ~AutoClose() { connection.close(); }
+};
+}
+
+RetractClient::RetractClient(const Url& u, const client::ConnectionSettings& cs)
+ : url(u), connectionSettings(cs)
+{}
+
+RetractClient::~RetractClient() { delete this; }
+
+
+void RetractClient::run() {
+ try {
+ client::Connection c = UpdateClient::catchUpConnection();
+ c.open(url, connectionSettings);
+ AutoClose ac(c);
+ AMQFrame retract((ClusterConnectionRetractOfferBody()));
+ client::ConnectionAccess::getImpl(c)->handle(retract);
+ } catch (const std::exception& e) {
+ QPID_LOG(error, " while retracting retract to " << url << ": " << e.what());
+ }
+}
+
+}} // namespace qpid::cluster
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.h?rev=790163&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.h Wed Jul 1 13:47:04 2009
@@ -0,0 +1,49 @@
+#ifndef QPID_CLUSTER_RETRACTCLIENT_H
+#define QPID_CLUSTER_RETRACTCLIENT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/sys/Runnable.h"
+
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A client that retracts an offer to a remote broker using AMQP. @see UpdateClient
+ */
+class RetractClient : public sys::Runnable {
+ public:
+
+ RetractClient(const Url&, const client::ConnectionSettings&);
+ ~RetractClient();
+ void run(); // Will delete this when finished.
+
+ private:
+ Url url;
+ client::ConnectionSettings connectionSettings;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_RETRACTCLIENT_H*/
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/RetractClient.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed Jul 1 13:47:04 2009
@@ -79,7 +79,7 @@
};
// Create a connection with special version that marks it as a catch-up connection.
-client::Connection catchUpConnection() {
+client::Connection UpdateClient::catchUpConnection() {
client::Connection c;
client::ConnectionAccess::setVersion(c, ProtocolVersion(0x80 , 0x80 + 10));
return c;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Wed Jul 1 13:47:04 2009
@@ -65,6 +65,7 @@
class UpdateClient : public sys::Runnable {
public:
static const std::string UPDATE; // Name for special update queue and exchange.
+ static client::Connection catchUpConnection();
UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry,
Modified: qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp Wed Jul 1 13:47:04 2009
@@ -23,7 +23,7 @@
#include "TestOptions.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/Connection.h"
-#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
@@ -63,7 +63,7 @@
try {
opts.open(connection);
if (!opts.quiet) cout << "Opened connection." << endl;
- Session s = connection.newSession();
+ AsyncSession s = connection.newSession();
string qname(Uuid(true).str());
s.queueDeclare(arg::queue=qname,arg::autoDelete=true,arg::exclusive=true);
s.messageTransfer(arg::content=Message("hello", qname));
@@ -71,6 +71,7 @@
SubscriptionManager subs(s);
subs.get(qname);
if (!opts.quiet) cout << "Received message." << endl;
+ s.sync();
s.close();
connection.close();
Mutex::ScopedLock l(lock);
Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=790163&r1=790162&r2=790163&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Wed Jul 1 13:47:04 2009
@@ -27,16 +27,24 @@
<class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
<doc>Qpid extension class to allow clustered brokers to communicate.</doc>
- <control name="update-request" code="0x1" label="URL for a member.">
+ <!-- New joiner requests an update to url. -->
+ <control name="update-request" code="0x1">
<field name="url" type="str16"/>
</control>
- <control name = "update-offer" code="0x2" label="Member offering to be updater for updatee.">
+ <!-- Sender offers an update to a new joiner. -->
+ <control name = "update-offer" code="0x2">
<field name="updatee" type="uint64"/>
<field name="cluster-id" type="uuid"/>
</control>
- <control name="ready" code="0x10" label="New member is ready.">
+ <!-- Sender retracts an offer to a new joiner. -->
+ <control name = "retract-offer" code="0x3">
+ <field name="updatee" type="uint64"/>
+ </control>
+
+ <!-- New member or updater is ready as an active member. -->
+ <control name="ready" code="0x10">
<field name="url" type="str16"/>
</control>
@@ -91,7 +99,7 @@
- attach sessions, create consumers, set flow with normal AMQP cokmmands.
- send /reset additional session state with controls below.
- send shadow-ready to mark end of shadow update.
- - send update-complete when entire update is complete.
+ - send membership when entire update is complete.
-->
<!-- Consumer state that cannot be set by standard AMQP controls. -->
@@ -166,6 +174,9 @@
<field name="frame-seq" type="uint64"/> <!-- frame sequence number -->
</control>
+ <!-- Updater cannot fulfill an update offer. -->
+ <control name = "retract-offer" code="0x22"/>
+
<!-- Set the position of a replicated queue. -->
<control name="queue-position" code="0x30">
<field name="queue" type="str8"/>
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org