You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/10 10:41:06 UTC

svn commit: r574176 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/client/ qpid/framing/ tests/

Author: gsim
Date: Mon Sep 10 01:41:05 2007
New Revision: 574176

URL: http://svn.apache.org/viewvc?rev=574176&view=rev
Log:
Client side support for message and delivery properties in header segments.


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Sep 10 01:41:05 2007
@@ -132,6 +132,7 @@
   qpid/framing/Blob.cpp \
   qpid/framing/MethodHolder.h qpid/framing/MethodHolder.cpp \
   qpid/framing/MethodHolderMaxSize.h \
+  qpid/framing/TransferContent.cpp \
   qpid/Exception.cpp \
   qpid/Plugin.h \
   qpid/Plugin.cpp \
@@ -178,6 +179,7 @@
   qpid/broker/FanOutExchange.cpp \
   qpid/broker/HeadersExchange.cpp \
   qpid/broker/Message.cpp \
+  qpid/broker/MessageAdapter.cpp \
   qpid/broker/MessageBuilder.cpp \
   qpid/broker/MessageDelivery.cpp \
   qpid/broker/MessageHandlerImpl.cpp \
@@ -344,6 +346,7 @@
   qpid/framing/SequenceNumberSet.h \
   qpid/framing/SerializeHandler.h \
   qpid/framing/StructHelper.h \
+  qpid/framing/TransferContent.h \
   qpid/framing/TypeFilter.h \
   qpid/framing/Value.h \
   qpid/framing/Visitor.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Mon Sep 10 01:41:05 2007
@@ -68,7 +68,7 @@
     if (msg->isImmediate() && getConsumerCount() == 0) {
         if (alternateExchange) {
             DeliverableMessage deliverable(msg);
-            alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+            alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders());
         }
     } else {
 
@@ -358,7 +358,7 @@
         while(!messages.empty()){
             DeliverableMessage msg(messages.front().payload);
             alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
-                                     &(msg.getMessage().getApplicationHeaders()));
+                                     msg.getMessage().getApplicationHeaders());
             pop();
         }
         alternateExchange->decAlternateUsers();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Sep 10 01:41:05 2007
@@ -95,7 +95,7 @@
     Exchange::shared_ptr alternate = queue->getAlternateExchange();
     if (alternate) {
         DeliverableMessage delivery(msg.payload);
-        alternate->route(delivery, msg.payload->getRoutingKey(), &(msg.payload->getApplicationHeaders()));
+        alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
         QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to " 
                  << alternate->getName());
     } else {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Mon Sep 10 01:41:05 2007
@@ -38,12 +38,12 @@
 
 Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), store(0), adapter(0) {}
 
-const std::string& Message::getRoutingKey() const
+std::string Message::getRoutingKey() const
 {
     return getAdapter().getRoutingKey(frames);
 }
 
-const std::string& Message::getExchangeName() const 
+std::string Message::getExchangeName() const 
 {
     return getAdapter().getExchange(frames);
 }
@@ -61,7 +61,7 @@
     return getAdapter().isImmediate(frames);
 }
 
-const FieldTable& Message::getApplicationHeaders() const
+const FieldTable* Message::getApplicationHeaders() const
 {
     return getAdapter().getApplicationHeaders(frames);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Mon Sep 10 01:41:05 2007
@@ -59,11 +59,11 @@
 
     uint64_t contentSize() const;
 
-    const std::string& getRoutingKey() const;
+    std::string getRoutingKey() const;
     const boost::shared_ptr<Exchange> getExchange(ExchangeRegistry&) const;
-    const std::string& getExchangeName() const;
+    std::string getExchangeName() const;
     bool isImmediate() const;
-    const framing::FieldTable& getApplicationHeaders() const;
+    const framing::FieldTable* getApplicationHeaders() const;
     bool isPersistent();
 
     framing::FrameSet& getFrames() { return frames; } 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp?rev=574176&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp Mon Sep 10 01:41:05 2007
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageAdapter.h"
+
+namespace {
+    const std::string empty;
+}
+
+namespace qpid {
+namespace broker{
+
+    std::string PublishAdapter::getRoutingKey(const framing::FrameSet& f)
+    {
+        return f.as<framing::BasicPublishBody>()->getRoutingKey();
+    }
+
+    std::string PublishAdapter::getExchange(const framing::FrameSet& f)
+    {
+        return f.as<framing::BasicPublishBody>()->getExchange();
+    }
+
+    bool PublishAdapter::isImmediate(const framing::FrameSet& f)
+    {
+        return f.as<framing::BasicPublishBody>()->getImmediate();
+    }
+
+    const framing::FieldTable* PublishAdapter::getApplicationHeaders(const framing::FrameSet& f)
+    {
+        const framing::BasicHeaderProperties* p = f.getHeaders()->get<framing::BasicHeaderProperties>();
+        return p ? &(p->getHeaders()) : 0;
+    }
+
+    bool PublishAdapter::isPersistent(const framing::FrameSet& f)
+    {
+        const framing::BasicHeaderProperties* p = f.getHeaders()->get<framing::BasicHeaderProperties>();
+        return p && p->getDeliveryMode() == 2;
+    }
+
+    std::string TransferAdapter::getRoutingKey(const framing::FrameSet& f)
+    {
+        const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+        return p ? p->getRoutingKey() : empty;
+    }
+
+    std::string TransferAdapter::getExchange(const framing::FrameSet& f)
+    {
+        return f.as<framing::MessageTransferBody>()->getDestination();
+    }
+
+    bool TransferAdapter::isImmediate(const framing::FrameSet&)
+    {
+        //TODO: we seem to have lost the immediate flag
+        return false;
+    }
+
+    const framing::FieldTable* TransferAdapter::getApplicationHeaders(const framing::FrameSet& f)
+    {
+        const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>();
+        return p ? &(p->getApplicationHeaders()) : 0;
+    }
+
+    bool TransferAdapter::isPersistent(const framing::FrameSet& f)
+    {
+        const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+        return p && p->getDeliveryMode() == 2;
+    }
+
+}}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h Mon Sep 10 01:41:05 2007
@@ -38,68 +38,29 @@
 {
     virtual ~MessageAdapter() {}
 
-    virtual const std::string& getRoutingKey(const framing::FrameSet& f) = 0;
-    virtual const std::string& getExchange(const framing::FrameSet& f) = 0;
+    virtual std::string getRoutingKey(const framing::FrameSet& f) = 0;
+    virtual std::string getExchange(const framing::FrameSet& f) = 0;
     virtual bool isImmediate(const framing::FrameSet& f) = 0;
-    virtual const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) = 0;
+    virtual const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f) = 0;
     virtual bool isPersistent(const framing::FrameSet& f) = 0;
 };
 
 struct PublishAdapter : MessageAdapter
 {
-    const std::string& getRoutingKey(const framing::FrameSet& f)
-    {
-        return f.as<framing::BasicPublishBody>()->getRoutingKey();
-    }
-
-    const std::string& getExchange(const framing::FrameSet& f)
-    {
-        return f.as<framing::BasicPublishBody>()->getExchange();
-    }
-
-    bool isImmediate(const framing::FrameSet& f)
-    {
-        return f.as<framing::BasicPublishBody>()->getImmediate();
-    }
-
-    const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f)
-    {
-        return f.getHeaders()->get<framing::BasicHeaderProperties>()->getHeaders();
-    }
-
-    bool isPersistent(const framing::FrameSet& f)
-    {
-        return f.getHeaders()->get<framing::BasicHeaderProperties>()->getDeliveryMode() == 2;
-    }
+    std::string getRoutingKey(const framing::FrameSet& f);
+    std::string getExchange(const framing::FrameSet& f);
+    bool isImmediate(const framing::FrameSet& f);
+    const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
+    bool isPersistent(const framing::FrameSet& f);
 };
 
 struct TransferAdapter : MessageAdapter
 {
-    const std::string& getRoutingKey(const framing::FrameSet& f)
-    {
-        return f.getHeaders()->get<framing::DeliveryProperties>()->getRoutingKey();
-    }
-
-    const std::string& getExchange(const framing::FrameSet& f)
-    {
-        return f.as<framing::MessageTransferBody>()->getDestination();
-    }
-
-    bool isImmediate(const framing::FrameSet&)
-    {
-        //TODO: we seem to have lost the immediate flag
-        return false;
-    }
-
-    const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f)
-    {
-        return f.getHeaders()->get<framing::MessageProperties>()->getApplicationHeaders();
-    }
-
-    bool isPersistent(const framing::FrameSet& f)
-    {
-        return f.getHeaders()->get<framing::DeliveryProperties>()->getDeliveryMode() == 2;
-    }
+    std::string getRoutingKey(const framing::FrameSet& f);
+    std::string getExchange(const framing::FrameSet& f);
+    bool isImmediate(const framing::FrameSet&);
+    const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
+    bool isPersistent(const framing::FrameSet& f);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Mon Sep 10 01:41:05 2007
@@ -22,8 +22,8 @@
 
 #include "Message.h"
 #include "MessageStore.h"
-#include "qpid/Exception.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/reply_exceptions.h"
 
 using namespace qpid::broker;
 using namespace qpid::framing;
@@ -46,7 +46,7 @@
         checkType(CONTENT_BODY, frame.getBody()->type());
         break;
     default:
-        throw ConnectionException(504, "Invalid frame sequence for message.");        
+        throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")"));
     }
     if (staging) {
         store->appendContent(*message, frame.castBody<AMQContentBody>()->getData());
@@ -61,13 +61,6 @@
     }
 }
 
-void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
-{
-    if (expected != actual) {
-        throw ConnectionException(504, "Invalid frame sequence for message.");
-    }
-}
-
 void MessageBuilder::end()
 {
     message.reset();
@@ -80,4 +73,33 @@
     message = Message::shared_ptr(new Message(id));
     state = METHOD;
     staging = false;
+}
+
+namespace {
+
+const std::string HEADER_BODY_S = "HEADER";
+const std::string METHOD_BODY_S = "METHOD";
+const std::string CONTENT_BODY_S = "CONTENT";
+const std::string HEARTBEAT_BODY_S = "HEARTBEAT";
+const std::string UNKNOWN = "unknown";
+
+std::string type_str(uint8_t type) 
+{
+    switch(type) {
+    case METHOD_BODY: return METHOD_BODY_S;
+    case HEADER_BODY: return HEADER_BODY_S;
+    case CONTENT_BODY: return CONTENT_BODY_S;
+    case HEARTBEAT_BODY: return HEARTBEAT_BODY_S;
+    }
+    return UNKNOWN;
+}
+
+}
+
+void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
+{
+    if (expected != actual) {
+        throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected " 
+                                               << type_str(expected) << " got " << type_str(actual) << ")"));
+    }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp Mon Sep 10 01:41:05 2007
@@ -336,13 +336,13 @@
         cacheExchange = getAdapter()->getConnection().broker.getExchanges().get(exchangeName);
     }
 
-    cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+    cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
 
     if (!strategy.delivered) {
         //TODO:if reject-unroutable, then reject
         //else route to alternate exchange
         if (cacheExchange->getAlternate()) {
-            cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+            cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
         }
     }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h Mon Sep 10 01:41:05 2007
@@ -58,8 +58,14 @@
     bool isRedelivered() const { return redelivered; }
     void setRedelivered(bool _redelivered){  redelivered = _redelivered; }
 
-    const HeaderProperties& getMethodHeaders() const { return *this; }
-
+    framing::AMQHeaderBody getHeader() const
+    { 
+        framing::AMQHeaderBody header;
+        BasicHeaderProperties* properties = header.get<BasicHeaderProperties>(true);
+        BasicHeaderProperties::copy<BasicHeaderProperties, Message>(*properties, *this);
+        properties->setContentLength(data.size());
+        return header;
+    }
 
     //TODO: move this elsewhere (GRS 24/08/2007)
     void populate(framing::FrameSet& frameset)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Mon Sep 10 01:41:05 2007
@@ -24,6 +24,7 @@
 
 using namespace qpid::client;
 using namespace qpid::framing;
+using namespace qpid::sys;
 
 ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c)
 {
@@ -38,6 +39,7 @@
 
 void ConnectionImpl::allocated(SessionCore::shared_ptr session)
 {
+    Mutex::ScopedLock l(lock);
     if (sessions.find(session->getId()) != sessions.end()) {
         throw Exception("Id already in use.");
     }
@@ -46,6 +48,7 @@
 
 void ConnectionImpl::released(SessionCore::shared_ptr session)
 {
+    Mutex::ScopedLock l(lock);
     SessionMap::iterator i = sessions.find(session->getId());
     if (i != sessions.end()) {
         sessions.erase(i);
@@ -59,12 +62,7 @@
 
 void ConnectionImpl::incoming(framing::AMQFrame& frame)
 {
-    uint16_t id = frame.getChannel();
-    SessionMap::iterator i = sessions.find(id);
-    if (i == sessions.end()) {
-        throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
-    }
-    i->second->handle(frame);
+    find(frame.getChannel())->handle(frame);
 }
 
 void ConnectionImpl::open(const std::string& host, int port,
@@ -93,10 +91,7 @@
 
 void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text)
 {
-    for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
-        i->second->closed(code, text);
-    }
-    sessions.clear();
+    signalClose(code, text);
     connector->close();
 }
 
@@ -114,8 +109,25 @@
 void ConnectionImpl::shutdown() 
 {
     //this indicates that the socket to the server has closed
+    signalClose(0, "Unexpected socket closure.");
+}
+
+void ConnectionImpl::signalClose(uint16_t code, const std::string& text) 
+{
+    Mutex::ScopedLock l(lock);
     for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
-        i->second->closed(0, "Unexpected socket closure.");
+        Mutex::ScopedUnlock u(lock);
+        i->second->closed(code, text);
     }
     sessions.clear();
+}
+
+SessionCore::shared_ptr ConnectionImpl::find(uint16_t id)
+{
+    Mutex::ScopedLock l(lock);
+    SessionMap::iterator i = sessions.find(id);
+    if (i == sessions.end()) {
+        throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
+    }
+    return i->second;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Mon Sep 10 01:41:05 2007
@@ -25,6 +25,7 @@
 #include <map>
 #include <boost/shared_ptr.hpp>
 #include "qpid/framing/FrameHandler.h"
+#include "qpid/sys/Mutex.h"
 #include "qpid/sys/ShutdownHandler.h"
 #include "qpid/sys/TimeoutHandler.h"
 #include "ConnectionHandler.h"
@@ -44,6 +45,7 @@
     ConnectionHandler handler;
     boost::shared_ptr<Connector> connector;
     framing::ProtocolVersion version;
+    sys::Mutex lock;
 
     void incoming(framing::AMQFrame& frame);    
     void closed();
@@ -51,6 +53,9 @@
     void idleOut();
     void idleIn();
     void shutdown();
+    void signalClose(uint16_t, const std::string&);
+    SessionCore::shared_ptr find(uint16_t);
+
 public:
     typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Mon Sep 10 01:41:05 2007
@@ -181,31 +181,28 @@
                                       CompletionTracker::ResultListener l)
 {
     SequenceNumber id = send(command, l);
-    sendContent(dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData());
+    sendContent(content);
     return id;
 }
 
-void ExecutionHandler::sendContent(const BasicHeaderProperties& headers, const std::string& data)
+void ExecutionHandler::sendContent(const MethodContent& content)
 {
-    AMQHeaderBody header;
-    BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers);
-    header.get<BasicHeaderProperties>(true)->setContentLength(data.size());
-    AMQFrame h(0, header);
-    out(h);
+    AMQFrame header(0, content.getHeader());
+    out(header);
 
-    u_int64_t data_length = data.length();
+    u_int64_t data_length = content.getData().length();
     if(data_length > 0){
         //frame itself uses 8 bytes
         u_int32_t frag_size = maxFrameSize - 8;
         if(data_length < frag_size){
-            AMQFrame frame(0, AMQContentBody(data));
+            AMQFrame frame(0, AMQContentBody(content.getData()));
             out(frame);
         }else{
             u_int32_t offset = 0;
             u_int32_t remaining = data_length - offset;
             while (remaining > 0) {
                 u_int32_t length = remaining > frag_size ? frag_size : remaining;
-                string frag(data.substr(offset, length));
+                string frag(content.getData().substr(offset, length));
                 AMQFrame frame(0, AMQContentBody(frag));
                 out(frame);
                 offset += length;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Mon Sep 10 01:41:05 2007
@@ -59,7 +59,7 @@
 
     void sendCompletion();
 
-    void sendContent(const framing::BasicHeaderProperties& headers, const std::string& data);
+    void sendContent(const framing::MethodContent&);
 
 public:
     typedef CompletionTracker::ResultListener ResultListener;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp Mon Sep 10 01:41:05 2007
@@ -41,6 +41,6 @@
 {
     out << "content (" << size() << " bytes)";
 #ifndef NDEBUG
-    out << data.substr(0,10);
+    out << " " << data.substr(0,10);
 #endif
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp Mon Sep 10 01:41:05 2007
@@ -56,17 +56,17 @@
 
 const AMQMethodBody* FrameSet::getMethod() const
 {
-    return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody());
+    return parts.empty() ? 0 : parts[0].getMethod();
 }
 
 const AMQHeaderBody* FrameSet::getHeaders() const
 {
-    return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody());
+    return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>();
 }
 
 AMQHeaderBody* FrameSet::getHeaders()
 {
-    return parts.size() < 2 ? 0 : dynamic_cast<AMQHeaderBody*>(parts[1].getBody());
+    return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>();
 }
 
 uint64_t FrameSet::getContentSize() const
@@ -80,4 +80,11 @@
 {
     AccumulateContent accumulator(out);
     map_if(accumulator, TypeFilter(CONTENT_BODY));
+}
+
+std::string FrameSet::getContent() const
+{
+    std::string out;
+    getContent(out);
+    return out;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h Mon Sep 10 01:41:05 2007
@@ -48,6 +48,7 @@
 
     uint64_t getContentSize() const;
     void getContent(std::string&) const;
+    std::string getContent() const;
 
     const AMQMethodBody* getMethod() const;
     const AMQHeaderBody* getHeaders() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h Mon Sep 10 01:41:05 2007
@@ -21,7 +21,8 @@
 #ifndef _MethodContent_
 #define _MethodContent_
 
-#include "HeaderProperties.h"
+#include <string>
+#include "AMQHeaderBody.h"
 
 namespace qpid {
 namespace framing {
@@ -31,7 +32,7 @@
 public:
     virtual ~MethodContent() {}
     //TODO: rethink this interface
-    virtual const HeaderProperties& getMethodHeaders() const = 0;
+    virtual AMQHeaderBody getHeader() const = 0;
     virtual const std::string& getData() const = 0;
 };
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp?rev=574176&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp Mon Sep 10 01:41:05 2007
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "TransferContent.h"
+
+namespace qpid {
+namespace framing {
+
+TransferContent::TransferContent(const std::string& _data)
+{
+    setData(_data);
+}
+
+AMQHeaderBody TransferContent::getHeader() const
+{
+    return header;
+}
+
+const std::string& TransferContent::getData() const
+{
+    return data;
+}
+
+void TransferContent::setData(const std::string& _data)
+{
+    data = _data;
+    header.get<MessageProperties>(true)->setContentLength(data.size());
+}
+
+void TransferContent::appendData(const std::string& _data)
+{
+    data += _data;
+    header.get<MessageProperties>(true)->setContentLength(data.size());
+}
+
+MessageProperties& TransferContent::getMessageProperties()
+{
+    return *header.get<MessageProperties>(true);
+}
+
+DeliveryProperties& TransferContent::getDeliveryProperties()
+{
+    return *header.get<DeliveryProperties>(true);
+}
+
+}}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h?rev=574176&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h Mon Sep 10 01:41:05 2007
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TransferContent_
+#define _TransferContent_
+
+#include "MethodContent.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/DeliveryProperties.h"
+
+namespace qpid {
+namespace framing {
+
+class TransferContent : public MethodContent
+{
+    AMQHeaderBody header;
+    std::string data;
+public:
+    TransferContent(const std::string& data);
+    AMQHeaderBody getHeader() const;
+    void setData(const std::string&);
+    void appendData(const std::string&);
+    const std::string& getData() const;
+    MessageProperties& getMessageProperties();
+    DeliveryProperties& getDeliveryProperties();
+};
+
+}}
+#endif  

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Mon Sep 10 01:41:05 2007
@@ -22,6 +22,7 @@
 #include "qpid_test_plugin.h"
 #include "InProcessBroker.h"
 #include "qpid/client/Session.h"
+#include "qpid/framing/TransferContent.h"
 
 using namespace qpid::client;
 using namespace qpid::framing;
@@ -29,7 +30,8 @@
 class ClientSessionTest : public CppUnit::TestCase
 {
     CPPUNIT_TEST_SUITE(ClientSessionTest);
-    CPPUNIT_TEST(testQueueQuery);;
+    CPPUNIT_TEST(testQueueQuery);
+    CPPUNIT_TEST(testTransfer);
     CPPUNIT_TEST_SUITE_END();
 
     boost::shared_ptr<Connector> broker;
@@ -55,14 +57,24 @@
         CPPUNIT_ASSERT_EQUAL(alternate, result.get().getAlternateExchange());
     }
 
-    void testCompletion()
+    void testTransfer()
     {
         std::string queue("my-queue");
         std::string dest("my-dest");
+        std::string data("my message");
         session.queueDeclare(0, queue, "", false, false, true, true, FieldTable());
-        //subcribe to the queue with confirm_mode = 1
+        //subcribe to the queue with confirm_mode = 1:
         session.messageSubscribe(0, queue, dest, false, 1, 0, false, FieldTable());
-        //publish some messages
+        //publish a message:
+        TransferContent content(data);
+        content.getDeliveryProperties().setRoutingKey("my-queue");
+        session.messageTransfer(0, "", 0, 0, content);
+        //get & test the message:
+        FrameSet::shared_ptr msg = session.get();
+        CPPUNIT_ASSERT(msg->isA<MessageTransferBody>());
+        CPPUNIT_ASSERT_EQUAL(data, msg->getContent());
+        //confirm receipt:
+        session.execution().completed(msg->getId(), true, true);
     }
 };