You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/01/29 22:47:22 UTC

svn commit: r739031 - in /qpid/trunk/qpid/cpp/src/qpid: amqp_0_10/SessionHandler.cpp amqp_0_10/SessionHandler.h broker/SessionHandler.cpp cluster/Cluster.cpp cluster/Cluster.h cluster/UpdateClient.cpp

Author: aconway
Date: Thu Jan 29 21:47:21 2009
New Revision: 739031

URL: http://svn.apache.org/viewvc?rev=739031&view=rev
Log:
Better error messages for not-attached exceptions.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=739031&r1=739030&r2=739031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Thu Jan 29 21:47:21 2009
@@ -34,6 +34,8 @@
 using namespace framing;
 using namespace std;
 
+#define CHECK_ATTACHED(MSG) if (!getState()) throw NotAttachedException(QPID_MSG(MSG << ": channel " << channel.get() << " is not attached"))
+
 SessionHandler::SessionHandler(FrameHandler* out, ChannelId ch)
     : channel(ch, out), peer(channel), ignoring(false), sendReady(), receiveReady() {}
 
@@ -61,14 +63,6 @@
 
 } // namespace
 
-void SessionHandler::checkAttached() {
-    if (!getState())
-        throw NotAttachedException(
-            QPID_MSG("Channel " << channel.get() << " is not attached"));
-    assert(getInHandler());
-    assert(channel.next);
-}
-
 void SessionHandler::invoke(const AMQMethodBody& m) {
     framing::invoke(*this, m);    
 }
@@ -82,7 +76,7 @@
         else if (isSessionControl(m))
             invoke(*m);
         else {
-            checkAttached();
+            CHECK_ATTACHED("receiving " << f);
             if (!receiveReady)
                 throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data"));
             if (!getState()->receiverRecord(f))
@@ -126,7 +120,7 @@
 } // namespace
 
 void SessionHandler::handleOut(AMQFrame& f) {
-    checkAttached();
+    CHECK_ATTACHED("sending " << f);
     if (!sendReady)
         throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to send data"));
     getState()->senderRecord(f); 
@@ -137,14 +131,6 @@
     channel.handle(f);
 }
 
-void SessionHandler::checkName(const std::string& name) {
-    checkAttached();
-    if (name != getState()->getId().getName())
-        throw InvalidArgumentException(
-            QPID_MSG("Incorrect session name: " << name
-                     << ", expecting: " << getState()->getId().getName()));
-}
-
 void SessionHandler::attach(const std::string& name_, bool force) {
     // Save the name for possible session-busy exception. Session-busy
     // can be thrown before we have attached the handler to a valid
@@ -164,18 +150,27 @@
         sendCommandPoint(getState()->senderGetCommandPoint());
 }
 
+#define CHECK_NAME(NAME, MSG) do {                                       \
+    CHECK_ATTACHED(MSG);                                                \
+    if (NAME != getState()->getId().getName())                          \
+        throw InvalidArgumentException(                                 \
+            QPID_MSG(MSG << ": incorrect session name: " << NAME        \
+                     << ", expecting: " << getState()->getId().getName())); \
+    } while(0)
+
+
 void SessionHandler::attached(const std::string& name) {
-    checkName(name);
+    CHECK_NAME(name, "session.attached");
 }
 
 void SessionHandler::detach(const std::string& name) {
-    checkName(name);
+    CHECK_NAME(name, "session.detach");
     peer.detached(name, session::DETACH_CODE_NORMAL);
     handleDetach();
 }
 
 void SessionHandler::detached(const std::string& name, uint8_t code) {
-    checkName(name);
+    CHECK_NAME(name, "session.detached");
     ignoring = false;
     if (code != session::DETACH_CODE_NORMAL)
         channelException(convert(code), "session.detached from peer.");
@@ -189,18 +184,18 @@
 }
 
 void SessionHandler::requestTimeout(uint32_t t) {
-    checkAttached();
+    CHECK_ATTACHED("session.request-timeout");
     getState()->setTimeout(t);
     peer.timeout(t);
 }
 
 void SessionHandler::timeout(uint32_t t) {
-    checkAttached();
+    CHECK_ATTACHED("session.request-timeout");
     getState()->setTimeout(t);
 }
 
 void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) {
-    checkAttached();
+    CHECK_ATTACHED("session.command-point");
     getState()->receiverSetCommandPoint(SessionPoint(id, offset));
     if (!receiveReady) {
         receiveReady = true;
@@ -209,7 +204,7 @@
 }
 
 void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragments*/) {
-    checkAttached();
+    CHECK_ATTACHED("session.expected");
     if (getState()->hasState()) { // Replay
         if (commands.empty()) throw IllegalStateException(
             QPID_MSG(getState()->getId() << ": has state but client is attaching as new session."));        
@@ -225,14 +220,14 @@
 }
 
 void SessionHandler::confirmed(const SequenceSet& commands, const Array& /*fragments*/) {
-    checkAttached();
+    CHECK_ATTACHED("session.confirmed");
     // Ignore non-contiguous confirmations.
     if (!commands.empty() && commands.front() >= getState()->senderGetReplayPoint()) 
         getState()->senderConfirmed(commands.rangesBegin()->last());
 }
 
 void SessionHandler::completed(const SequenceSet& commands, bool timelyReply) {
-    checkAttached();
+    CHECK_ATTACHED("session.completed");
     getState()->senderCompleted(commands);
     if (getState()->senderNeedKnownCompleted() || timelyReply) {
         peer.knownCompleted(commands);
@@ -241,12 +236,12 @@
 }
 
 void SessionHandler::knownCompleted(const SequenceSet& commands) {
-    checkAttached();
+    CHECK_ATTACHED("session.known-completed");
     getState()->receiverKnownCompleted(commands);
 }
 
 void SessionHandler::flush(bool expected, bool confirmed, bool completed) {
-    checkAttached();
+    CHECK_ATTACHED("session.flush");
     if (expected)  {
         SequenceSet expectSet;
         if (getState()->hasState())
@@ -270,19 +265,19 @@
 
 void SessionHandler::sendDetach()
 {
-    checkAttached();
+    CHECK_ATTACHED("session.sendDetach");
     ignoring = true;
     peer.detach(getState()->getId().getName());
 }
 
 void SessionHandler::sendCompletion() {
-    checkAttached();
+    CHECK_ATTACHED("session.send-completion");
     const SequenceSet& c = getState()->receiverGetUnknownComplete();
     peer.completed(c, getState()->receiverNeedKnownCompleted());
 }
 
 void SessionHandler::sendAttach(bool force) {
-    checkAttached();
+    CHECK_ATTACHED("session.send-attach");
     QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << getState()->getId());
     peer.attach(getState()->getId().getName(), force);
     if (getState()->hasState())
@@ -306,7 +301,7 @@
 }
 
 void SessionHandler::sendTimeout(uint32_t t) {
-    checkAttached();
+    CHECK_ATTACHED("session.send-timeout");
     peer.requestTimeout(t);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h?rev=739031&r1=739030&r2=739031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h Thu Jan 29 21:47:21 2009
@@ -98,9 +98,6 @@
     virtual void handleIn(framing::AMQFrame&);
     virtual void handleOut(framing::AMQFrame&);
 
-    void checkAttached();
-    void checkName(const std::string& name);
-
     framing::ChannelHandler channel;
     framing::AMQP_AllProxy::Session  peer;
     bool ignoring;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=739031&r1=739030&r2=739031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Thu Jan 29 21:47:21 2009
@@ -90,7 +90,7 @@
 // 
 void SessionHandler::attached(const std::string& name) {
     if (session.get()) {
-        checkName(name);
+        amqp_0_10::SessionHandler::attached(name);
     } else {
         SessionId id(connection.getUserId(), name);
         SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=739031&r1=739030&r2=739031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Jan 29 21:47:21 2009
@@ -65,12 +65,6 @@
 using qpid::management::Args;
 namespace _qmf = ::qmf::org::apache::qpid::cluster;
 
-/**@file
-   Threading notes:
-   - Public functions may be called in local connection IO threads.
-   see .h.
-*/ 
-
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;
     MemberId member;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=739031&r1=739030&r2=739031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Jan 29 21:47:21 2009
@@ -58,9 +58,6 @@
 
 /**
  * Connection to the cluster
- *
- * Threading notes: 3 thread categories: connection, deliver, update.
- * 
  */
 class Cluster : private Cpg::Handler, public management::Manageable {
   public:

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=739031&r1=739030&r2=739031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Thu Jan 29 21:47:21 2009
@@ -104,7 +104,7 @@
 const std::string UpdateClient::UPDATE(UPDATE_CHARS, sizeof(UPDATE_CHARS)); 
 
 void UpdateClient::update() {
-    QPID_LOG(debug, updaterId << " updateing state to " << updateeId << " at " << updateeUrl);
+    QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl);
     Broker& b = updaterBroker;
     b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
 
@@ -144,7 +144,7 @@
 } // namespace
 
 void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
-    QPID_LOG(debug, updaterId << " updateing exchange " << ex->getName());
+    QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
     ClusterConnectionProxy proxy(session);
     proxy.exchange(encode(*ex));
 }
@@ -187,7 +187,7 @@
 
 
 void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) {
-    QPID_LOG(debug, updaterId << " updateing queue " << q->getName());
+    QPID_LOG(debug, updaterId << " updating queue " << q->getName());
     ClusterConnectionProxy proxy(session);
     proxy.queue(encode(*q));
     MessageUpdater updater(q->getName(), session);
@@ -201,7 +201,7 @@
 }
 
 void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
-    QPID_LOG(debug, updaterId << " updateing connection " << *updateConnection);
+    QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
     shadowConnection = catchUpConnection();
 
     broker::Connection& bc = updateConnection->getBrokerConnection();
@@ -216,7 +216,7 @@
 }
 
 void UpdateClient::updateSession(broker::SessionHandler& sh) {
-    QPID_LOG(debug, updaterId << " updateing session " << &sh.getConnection()  << "[" << sh.getChannel() << "] = "
+    QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection()  << "[" << sh.getChannel() << "] = "
              << sh.getSession()->getId());
     broker::SessionState* ss = sh.getSession();
     if (!ss) return;            // no session.
@@ -230,10 +230,10 @@
     // Re-create session state on remote connection.
 
     // Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
-    QPID_LOG(debug, updaterId << " updateing consumers.");
+    QPID_LOG(debug, updaterId << " updating consumers.");
     ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this));
 
-    QPID_LOG(debug, updaterId << " updateing unacknowledged messages.");
+    QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
     broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
     std::for_each(drs.begin(), drs.end(),  boost::bind(&UpdateClient::updateUnacked, this, _1));
 
@@ -267,7 +267,7 @@
 }
 
 void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) {
-    QPID_LOG(debug, updaterId << " updateing consumer " << ci->getName() << " on " << shadowSession.getId());
+    QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId());
     using namespace message;
     shadowSession.messageSubscribe(
         arg::queue       = ci->getQueue()->getName(),
@@ -354,7 +354,7 @@
 };
     
 void UpdateClient::updateTxState(broker::SemanticState& s) {
-    QPID_LOG(debug, updaterId << " updateing TX transaction state.");
+    QPID_LOG(debug, updaterId << " updating TX transaction state.");
     ClusterConnectionProxy proxy(shadowSession);
     proxy.accumulatedAck(s.getAccumulatedAck());
     broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org