You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/01/29 22:47:22 UTC
svn commit: r739031 - in /qpid/trunk/qpid/cpp/src/qpid:
amqp_0_10/SessionHandler.cpp amqp_0_10/SessionHandler.h
broker/SessionHandler.cpp cluster/Cluster.cpp cluster/Cluster.h
cluster/UpdateClient.cpp
Author: aconway
Date: Thu Jan 29 21:47:21 2009
New Revision: 739031
URL: http://svn.apache.org/viewvc?rev=739031&view=rev
Log:
Better error messages for not-attached exceptions.
Modified:
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=739031&r1=739030&r2=739031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Thu Jan 29 21:47:21 2009
@@ -34,6 +34,8 @@
using namespace framing;
using namespace std;
+#define CHECK_ATTACHED(MSG) if (!getState()) throw NotAttachedException(QPID_MSG(MSG << ": channel " << channel.get() << " is not attached"))
+
SessionHandler::SessionHandler(FrameHandler* out, ChannelId ch)
: channel(ch, out), peer(channel), ignoring(false), sendReady(), receiveReady() {}
@@ -61,14 +63,6 @@
} // namespace
-void SessionHandler::checkAttached() {
- if (!getState())
- throw NotAttachedException(
- QPID_MSG("Channel " << channel.get() << " is not attached"));
- assert(getInHandler());
- assert(channel.next);
-}
-
void SessionHandler::invoke(const AMQMethodBody& m) {
framing::invoke(*this, m);
}
@@ -82,7 +76,7 @@
else if (isSessionControl(m))
invoke(*m);
else {
- checkAttached();
+ CHECK_ATTACHED("receiving " << f);
if (!receiveReady)
throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data"));
if (!getState()->receiverRecord(f))
@@ -126,7 +120,7 @@
} // namespace
void SessionHandler::handleOut(AMQFrame& f) {
- checkAttached();
+ CHECK_ATTACHED("sending " << f);
if (!sendReady)
throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to send data"));
getState()->senderRecord(f);
@@ -137,14 +131,6 @@
channel.handle(f);
}
-void SessionHandler::checkName(const std::string& name) {
- checkAttached();
- if (name != getState()->getId().getName())
- throw InvalidArgumentException(
- QPID_MSG("Incorrect session name: " << name
- << ", expecting: " << getState()->getId().getName()));
-}
-
void SessionHandler::attach(const std::string& name_, bool force) {
// Save the name for possible session-busy exception. Session-busy
// can be thrown before we have attached the handler to a valid
@@ -164,18 +150,27 @@
sendCommandPoint(getState()->senderGetCommandPoint());
}
+#define CHECK_NAME(NAME, MSG) do { \
+ CHECK_ATTACHED(MSG); \
+ if (NAME != getState()->getId().getName()) \
+ throw InvalidArgumentException( \
+ QPID_MSG(MSG << ": incorrect session name: " << NAME \
+ << ", expecting: " << getState()->getId().getName())); \
+ } while(0)
+
+
void SessionHandler::attached(const std::string& name) {
- checkName(name);
+ CHECK_NAME(name, "session.attached");
}
void SessionHandler::detach(const std::string& name) {
- checkName(name);
+ CHECK_NAME(name, "session.detach");
peer.detached(name, session::DETACH_CODE_NORMAL);
handleDetach();
}
void SessionHandler::detached(const std::string& name, uint8_t code) {
- checkName(name);
+ CHECK_NAME(name, "session.detached");
ignoring = false;
if (code != session::DETACH_CODE_NORMAL)
channelException(convert(code), "session.detached from peer.");
@@ -189,18 +184,18 @@
}
void SessionHandler::requestTimeout(uint32_t t) {
- checkAttached();
+ CHECK_ATTACHED("session.request-timeout");
getState()->setTimeout(t);
peer.timeout(t);
}
void SessionHandler::timeout(uint32_t t) {
- checkAttached();
+ CHECK_ATTACHED("session.request-timeout");
getState()->setTimeout(t);
}
void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) {
- checkAttached();
+ CHECK_ATTACHED("session.command-point");
getState()->receiverSetCommandPoint(SessionPoint(id, offset));
if (!receiveReady) {
receiveReady = true;
@@ -209,7 +204,7 @@
}
void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragments*/) {
- checkAttached();
+ CHECK_ATTACHED("session.expected");
if (getState()->hasState()) { // Replay
if (commands.empty()) throw IllegalStateException(
QPID_MSG(getState()->getId() << ": has state but client is attaching as new session."));
@@ -225,14 +220,14 @@
}
void SessionHandler::confirmed(const SequenceSet& commands, const Array& /*fragments*/) {
- checkAttached();
+ CHECK_ATTACHED("session.confirmed");
// Ignore non-contiguous confirmations.
if (!commands.empty() && commands.front() >= getState()->senderGetReplayPoint())
getState()->senderConfirmed(commands.rangesBegin()->last());
}
void SessionHandler::completed(const SequenceSet& commands, bool timelyReply) {
- checkAttached();
+ CHECK_ATTACHED("session.completed");
getState()->senderCompleted(commands);
if (getState()->senderNeedKnownCompleted() || timelyReply) {
peer.knownCompleted(commands);
@@ -241,12 +236,12 @@
}
void SessionHandler::knownCompleted(const SequenceSet& commands) {
- checkAttached();
+ CHECK_ATTACHED("session.known-completed");
getState()->receiverKnownCompleted(commands);
}
void SessionHandler::flush(bool expected, bool confirmed, bool completed) {
- checkAttached();
+ CHECK_ATTACHED("session.flush");
if (expected) {
SequenceSet expectSet;
if (getState()->hasState())
@@ -270,19 +265,19 @@
void SessionHandler::sendDetach()
{
- checkAttached();
+ CHECK_ATTACHED("session.sendDetach");
ignoring = true;
peer.detach(getState()->getId().getName());
}
void SessionHandler::sendCompletion() {
- checkAttached();
+ CHECK_ATTACHED("session.send-completion");
const SequenceSet& c = getState()->receiverGetUnknownComplete();
peer.completed(c, getState()->receiverNeedKnownCompleted());
}
void SessionHandler::sendAttach(bool force) {
- checkAttached();
+ CHECK_ATTACHED("session.send-attach");
QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << getState()->getId());
peer.attach(getState()->getId().getName(), force);
if (getState()->hasState())
@@ -306,7 +301,7 @@
}
void SessionHandler::sendTimeout(uint32_t t) {
- checkAttached();
+ CHECK_ATTACHED("session.send-timeout");
peer.requestTimeout(t);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h?rev=739031&r1=739030&r2=739031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h Thu Jan 29 21:47:21 2009
@@ -98,9 +98,6 @@
virtual void handleIn(framing::AMQFrame&);
virtual void handleOut(framing::AMQFrame&);
- void checkAttached();
- void checkName(const std::string& name);
-
framing::ChannelHandler channel;
framing::AMQP_AllProxy::Session peer;
bool ignoring;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=739031&r1=739030&r2=739031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Thu Jan 29 21:47:21 2009
@@ -90,7 +90,7 @@
//
void SessionHandler::attached(const std::string& name) {
if (session.get()) {
- checkName(name);
+ amqp_0_10::SessionHandler::attached(name);
} else {
SessionId id(connection.getUserId(), name);
SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=739031&r1=739030&r2=739031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Jan 29 21:47:21 2009
@@ -65,12 +65,6 @@
using qpid::management::Args;
namespace _qmf = ::qmf::org::apache::qpid::cluster;
-/**@file
- Threading notes:
- - Public functions may be called in local connection IO threads.
- see .h.
-*/
-
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
MemberId member;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=739031&r1=739030&r2=739031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Jan 29 21:47:21 2009
@@ -58,9 +58,6 @@
/**
* Connection to the cluster
- *
- * Threading notes: 3 thread categories: connection, deliver, update.
- *
*/
class Cluster : private Cpg::Handler, public management::Manageable {
public:
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=739031&r1=739030&r2=739031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Thu Jan 29 21:47:21 2009
@@ -104,7 +104,7 @@
const std::string UpdateClient::UPDATE(UPDATE_CHARS, sizeof(UPDATE_CHARS));
void UpdateClient::update() {
- QPID_LOG(debug, updaterId << " updateing state to " << updateeId << " at " << updateeUrl);
+ QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl);
Broker& b = updaterBroker;
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
@@ -144,7 +144,7 @@
} // namespace
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
- QPID_LOG(debug, updaterId << " updateing exchange " << ex->getName());
+ QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
ClusterConnectionProxy proxy(session);
proxy.exchange(encode(*ex));
}
@@ -187,7 +187,7 @@
void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) {
- QPID_LOG(debug, updaterId << " updateing queue " << q->getName());
+ QPID_LOG(debug, updaterId << " updating queue " << q->getName());
ClusterConnectionProxy proxy(session);
proxy.queue(encode(*q));
MessageUpdater updater(q->getName(), session);
@@ -201,7 +201,7 @@
}
void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
- QPID_LOG(debug, updaterId << " updateing connection " << *updateConnection);
+ QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
shadowConnection = catchUpConnection();
broker::Connection& bc = updateConnection->getBrokerConnection();
@@ -216,7 +216,7 @@
}
void UpdateClient::updateSession(broker::SessionHandler& sh) {
- QPID_LOG(debug, updaterId << " updateing session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
+ QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
<< sh.getSession()->getId());
broker::SessionState* ss = sh.getSession();
if (!ss) return; // no session.
@@ -230,10 +230,10 @@
// Re-create session state on remote connection.
// Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
- QPID_LOG(debug, updaterId << " updateing consumers.");
+ QPID_LOG(debug, updaterId << " updating consumers.");
ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this));
- QPID_LOG(debug, updaterId << " updateing unacknowledged messages.");
+ QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
std::for_each(drs.begin(), drs.end(), boost::bind(&UpdateClient::updateUnacked, this, _1));
@@ -267,7 +267,7 @@
}
void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) {
- QPID_LOG(debug, updaterId << " updateing consumer " << ci->getName() << " on " << shadowSession.getId());
+ QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
@@ -354,7 +354,7 @@
};
void UpdateClient::updateTxState(broker::SemanticState& s) {
- QPID_LOG(debug, updaterId << " updateing TX transaction state.");
+ QPID_LOG(debug, updaterId << " updating TX transaction state.");
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org