You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/02/07 11:13:42 UTC

svn commit: r504485 - in /incubator/qpid/branches/qpid.0-9/cpp: lib/broker/ tests/

Author: aconway
Date: Wed Feb  7 02:13:41 2007
New Revision: 504485

URL: http://svn.apache.org/viewvc?view=rev&rev=504485
Log:

* broker/BrokerMessage.cpp: Added ConnectionToken publisher.

* cpp/lib/broker/BrokerMessageMessage.cpp:
 - Added ConnectionToken publisher.
 - Implemented getDeliveryMode, getApplicationHeaders

* cpp/lib/broker/Reference.cpp: Holds MessageMessage instead of just
  MessageTransferBody.

Modified:
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h
    incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Wed Feb  7 02:13:41 2007
@@ -188,7 +188,9 @@
     if(blocked) queue->dispatch();
 }
 
-void Channel::handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr& exch){
+void Channel::handleInlineTransfer(
+    Message::shared_ptr msg, Exchange::shared_ptr& exch)
+{
     if(transactional){
         TxPublish* deliverable = new TxPublish(msg);
         exch->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Wed Feb  7 02:13:41 2007
@@ -139,7 +139,7 @@
     void handleContent(boost::shared_ptr<framing::AMQContentBody>);
     void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
     
-    void handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr& exchange);
+    void handleInlineTransfer(Message::shared_ptr msg, Exchange::shared_ptr& exchange);
     
     // For ChannelAdapter
     void handleMethodInContext(

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp Wed Feb  7 02:13:41 2007
@@ -41,11 +41,10 @@
     const string& _exchange, const string& _routingKey, 
     bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo
 ) :
-    Message(_exchange, _routingKey, _mandatory, _immediate, respondTo),
-    publisher(_publisher),
+    Message(_publisher, _exchange, _routingKey, _mandatory,
+            _immediate, respondTo),
     size(0)
-{
-}
+{}
 
 // FIXME aconway 2007-02-01: remove.
 // BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : 
@@ -56,7 +55,7 @@
 // }
 
 // For tests only.
-BasicMessage::BasicMessage() : publisher(0), size(0)
+BasicMessage::BasicMessage() : size(0)
 {}
 
 BasicMessage::~BasicMessage(){
@@ -126,10 +125,6 @@
     return getHeaderProperties()->getHeaders();
 }
 
-const ConnectionToken* const BasicMessage::getPublisher(){
-    return publisher;
-}
-
 bool BasicMessage::isPersistent()
 {
     if(!header) return false;
@@ -230,12 +225,14 @@
         store->stage(this);
     }
     if (!content.get() || content->size() > 0) {
+        // FIXME aconway 2007-02-07: handle MessageMessage.
         //set content to lazy loading mode (but only if there is stored content):
 
         //Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is
         //      then set as a member of that message so its lifetime is guaranteed to be no longer than
         //      that of the message itself
-        content = std::auto_ptr<Content>(new LazyLoadedContent(store, this, expectedContentSize()));
+        content = std::auto_ptr<Content>(
+            new LazyLoadedContent(store, this, expectedContentSize()));
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h Wed Feb  7 02:13:41 2007
@@ -52,7 +52,6 @@
  * request.
  */
 class BasicMessage : public Message {
-    const ConnectionToken* const publisher;
     framing::AMQHeaderBody::shared_ptr header;
     std::auto_ptr<Content> content;
     sys::Mutex contentLock;
@@ -72,7 +71,6 @@
     void setHeader(framing::AMQHeaderBody::shared_ptr header);
     void addContent(framing::AMQContentBody::shared_ptr data);
     bool isComplete();
-    const ConnectionToken* const getPublisher();
 
     void deliver(framing::ChannelAdapter&, 
                  const string& consumerTag, 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h Wed Feb  7 02:13:41 2007
@@ -40,10 +40,10 @@
 class FieldTable;
 }
 	
-namespace broker {
 
-class MessageStore;
+namespace broker {
 class ConnectionToken;
+class MessageStore;
 
 /**
  * Base class for all types of internal broker messages
@@ -51,6 +51,7 @@
  * TODO; AMS: for the moment this is mostly a placeholder
  */
 class Message{
+    const ConnectionToken* publisher;
     std::string exchange;
     std::string routingKey;
     const bool mandatory;
@@ -62,9 +63,12 @@
   public:
     typedef boost::shared_ptr<Message> shared_ptr;
 
-    Message(const std::string& _exchange, const std::string& _routingKey, 
+    Message(const ConnectionToken* publisher_,
+            const std::string& _exchange,
+            const std::string& _routingKey, 
             bool _mandatory, bool _immediate,
             framing::AMQMethodBody::shared_ptr respondTo_) :
+        publisher(publisher_),
         exchange(_exchange),
         routingKey(_routingKey),
         mandatory(_mandatory),
@@ -122,7 +126,9 @@
     virtual framing::BasicHeaderProperties* getHeaderProperties() = 0;
     virtual const framing::FieldTable& getApplicationHeaders() = 0;
     virtual bool isPersistent() = 0;
-    virtual const ConnectionToken* const getPublisher() = 0;
+    virtual const ConnectionToken* getPublisher() const {
+        return publisher;
+    }
 
     virtual void encode(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
     virtual void encodeHeader(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp Wed Feb  7 02:13:41 2007
@@ -25,26 +25,24 @@
 #include "MessageAppendBody.h"
 #include "Reference.h"
 #include "framing/FieldTable.h"
+#include "framing/BasicHeaderProperties.h"
 
 #include <iostream>
 
 using namespace std;
-using namespace qpid::broker;
 using namespace qpid::framing;
-	
-MessageMessage::MessageMessage(TransferPtr transfer_)
-    : Message(transfer_->getDestination(), transfer_->getRoutingKey(),
-              transfer_->getMandatory(), transfer_->getImmediate(),
-              transfer_),
-      transfer(transfer_)
-{}
 
-MessageMessage::MessageMessage(TransferPtr transfer_, const Reference& ref)
-    : Message(transfer_->getDestination(), transfer_->getRoutingKey(),
-              transfer_->getMandatory(), transfer_->getImmediate(),
-              transfer_),
-      transfer(transfer_),
-      appends(ref.getAppends())
+namespace qpid {
+namespace broker {
+	
+MessageMessage::MessageMessage(
+    ConnectionToken* publisher, TransferPtr transfer_
+) : Message(publisher, transfer_->getDestination(),
+            transfer_->getRoutingKey(),
+            transfer_->getMandatory(),
+            transfer_->getImmediate(),
+            transfer_),
+    transfer(transfer_)
 {}
 
 void MessageMessage::deliver(
@@ -55,29 +53,29 @@
 {
     channel.send(
     	new MessageTransferBody(channel.getVersion(), 
-        transfer->getTicket(),
-        consumerTag,
-        getRedelivered(),
-        transfer->getImmediate(),
-        transfer->getTtl(),
-        transfer->getPriority(),
-        transfer->getTimestamp(),
-        transfer->getDeliveryMode(),
-        transfer->getExpiration(),
-        getExchange(),
-        getRoutingKey(),
-        transfer->getMessageId(),
-        transfer->getCorrelationId(),
-        transfer->getReplyTo(),
-        transfer->getContentType(),
-        transfer->getContentEncoding(),
-        transfer->getUserId(),
-        transfer->getAppId(),
-        transfer->getTransactionId(),
-        transfer->getSecurityToken(),
-        transfer->getApplicationHeaders(),
-        transfer->getBody(),
-        transfer->getMandatory()));
+                                transfer->getTicket(),
+                                consumerTag,
+                                getRedelivered(),
+                                transfer->getImmediate(),
+                                transfer->getTtl(),
+                                transfer->getPriority(),
+                                transfer->getTimestamp(),
+                                transfer->getDeliveryMode(),
+                                transfer->getExpiration(),
+                                getExchange(),
+                                getRoutingKey(),
+                                transfer->getMessageId(),
+                                transfer->getCorrelationId(),
+                                transfer->getReplyTo(),
+                                transfer->getContentType(),
+                                transfer->getContentEncoding(),
+                                transfer->getUserId(),
+                                transfer->getAppId(),
+                                transfer->getTransactionId(),
+                                transfer->getSecurityToken(),
+                                transfer->getApplicationHeaders(),
+                                transfer->getBody(),
+                                transfer->getMandatory()));
 }
 
 void MessageMessage::sendGetOk(
@@ -107,19 +105,11 @@
 
 const FieldTable& MessageMessage::getApplicationHeaders()
 {
-    THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
     return transfer->getApplicationHeaders();
 }
 bool MessageMessage::isPersistent()
 {
-    THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
-    return false;               // FIXME aconway 2007-02-05: 
-}
-
-const ConnectionToken* const MessageMessage::getPublisher()
-{
-    THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
-    return 0;               // FIXME aconway 2007-02-05: 
+    return transfer->getDeliveryMode() == PERSISTENT;
 }
 
 u_int32_t MessageMessage::encodedSize()
@@ -146,3 +136,5 @@
     return 0;               // FIXME aconway 2007-02-05: 
 }
 
+
+}} // namespace qpid::broker

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Wed Feb  7 02:13:41 2007
@@ -35,19 +35,25 @@
 }
 	
 namespace broker {
+class ConnectionToken;
 class Reference;
 
 class MessageMessage: public Message{
   public:
-    typedef Reference::TransferPtr TransferPtr;
+    typedef boost::shared_ptr<MessageMessage> shared_ptr;
+    typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
     typedef Reference::AppendPtr AppendPtr;
     typedef Reference::Appends Appends;
 
-    MessageMessage(TransferPtr transfer);
-    MessageMessage(TransferPtr transfer, const Reference&);
+    MessageMessage(ConnectionToken* publisher, TransferPtr transfer);
             
     // Default destructor okay
-			            
+
+    TransferPtr getTransfer() { return transfer; }
+
+    const Appends& getAppends() { return appends; }
+    void setAppends(const Appends& appends_) { appends = appends_; }
+    
     void deliver(framing::ChannelAdapter& channel, 
                  const std::string& consumerTag, 
                  u_int64_t deliveryTag, 
@@ -64,19 +70,16 @@
     framing::BasicHeaderProperties* getHeaderProperties();
     const framing::FieldTable& getApplicationHeaders();
     bool isPersistent();
-    const ConnectionToken* const getPublisher();
             
     u_int32_t encodedSize();
     u_int32_t encodedHeaderSize();
     u_int32_t encodedContentSize();
     u_int64_t expectedContentSize();
 
-    TransferPtr getTransfer() { return transfer; }
-    const Appends& getAppends() { return appends; }
   private:
 
     const TransferPtr transfer;
-    const Appends appends;
+    Appends appends;
 };
 
 }}

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Wed Feb  7 02:13:41 2007
@@ -217,14 +217,13 @@
     MessageTransferBody::shared_ptr transfer(
         boost::shared_polymorphic_downcast<MessageTransferBody>(
             context.methodBody));
-    if (body.isInline()) {
-        Message::shared_ptr msg(new MessageMessage(transfer));
-        channel.handleInlineTransfer(msg, exchange);
-    }
-    else {
-        // Add to reference.
-        references.get(body.getValue()).transfer(transfer);
-    }
+    MessageMessage::shared_ptr message(
+        new MessageMessage(&connection, transfer));
+    
+    if (body.isInline()) 
+        channel.handleInlineTransfer(message, exchange);
+    else 
+        references.get(body.getValue()).addMessage(message);
     client.ok(context);
 }
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp Wed Feb  7 02:13:41 2007
@@ -43,14 +43,14 @@
 }
 
 void  Reference::close() {
-    for_each(transfers.begin(), transfers.end(),
+    for_each(messages.begin(), messages.end(),
              boost::bind(&Reference::complete, this, _1));
     registry->references.erase(getId());
 }
 
-void Reference::complete(TransferPtr transfer) {
-    MessageMessage::shared_ptr msg(new MessageMessage(transfer, *this));
-    registry->handler.complete(msg);
+void Reference::complete(MessagePtr message) {
+    message->setAppends(appends);
+    registry->handler.complete(message);
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h Wed Feb  7 02:13:41 2007
@@ -28,20 +28,21 @@
 namespace qpid {
 
 namespace framing {
-class MessageTransferBody;
 class MessageAppendBody;
 }
 
 namespace broker {
 
+class MessageMessage;
 class CompletionHandler;
 class ReferenceRegistry;
 
 /**
  * A reference is an accumulation point for data in a multi-frame
- * message. A reference can be used by multiple transfer commands, so
- * the reference tracks which commands are using it. When the reference
- * is closed, all the associated transfers are completed.
+ * message. A reference can be used by multiple transfer commands to
+ * create multiple messages, so the reference tracks which commands
+ * are using it. When the reference is closed, all the associated
+ * transfers are completed.
  *
  * THREAD UNSAFE: per-channel resource, access to channels is
  * serialized.
@@ -50,8 +51,8 @@
 {
   public:
     typedef std::string Id;
-    typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
-    typedef std::vector<TransferPtr> Transfers;
+    typedef boost::shared_ptr<MessageMessage> MessagePtr;
+    typedef std::vector<MessagePtr> Messages;
     typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr;
     typedef std::vector<AppendPtr> Appends;
 
@@ -60,24 +61,24 @@
     
     const std::string& getId() const { return id; }
 
-    /** Add a transfer to be completed with this reference */
-    void transfer(TransferPtr transfer) { transfers.push_back(transfer); }
+    /** Add a message to be completed with this reference */
+    void addMessage(MessagePtr message) { messages.push_back(message); }
 
     /** Append more data to the reference */
     void append(AppendPtr ptr) { appends.push_back(ptr); }
 
-    /** Close the reference, complete each associated transfer */
+    /** Close the reference, complete each associated message */
     void close();
 
     const Appends& getAppends() const { return appends; }
-    const Transfers& getTransfers() const { return transfers; }
+    const Messages& getMessages() const { return messages; }
     
   private:
-    void complete(TransferPtr transfer);
+    void complete(MessagePtr message);
     
     Id id;
     ReferenceRegistry* registry;
-    Transfers transfers;
+    Messages messages;
     Appends appends;
 };
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp?view=diff&rev=504485&r1=504484&r2=504485
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp Wed Feb  7 02:13:41 2007
@@ -41,14 +41,20 @@
 
 
     struct MockCompletionHandler : public CompletionHandler {
-        std::vector<Message::shared_ptr> messages;
-        void complete(Message::shared_ptr msg) { messages.push_back(msg); }
+        std::vector<MessageMessage::shared_ptr> messages;
+        void complete(Message::shared_ptr m) {
+            MessageMessage::shared_ptr mm =
+                dynamic_pointer_cast<MessageMessage>(m);
+            CPPUNIT_ASSERT(mm);
+            messages.push_back(mm);
+        }
     };
 
     MockCompletionHandler handler;
     ProtocolVersion v;
     ReferenceRegistry registry;
     MessageTransferBody::shared_ptr t1, t2;
+    MessageMessage::shared_ptr m1, m2;
     MessageAppendBody::shared_ptr a1, a2;
   public:
 
@@ -56,6 +62,8 @@
         registry(handler),
         t1(new MessageTransferBody(v)),
         t2(new MessageTransferBody(v)),
+        m1(new MessageMessage(0, t1)),
+        m2(new MessageMessage(0, t2)),
         a1(new MessageAppendBody(v)),
         a2(new MessageAppendBody(v))
     {}
@@ -74,19 +82,11 @@
         } catch(...) {}
     }
 
-    MessageMessage& handlerMessage(size_t i) {
-        CPPUNIT_ASSERT(handler.messages.size() > i);
-        MessageMessage* msg = dynamic_cast<MessageMessage*>(
-            handler.messages[i].get());
-        CPPUNIT_ASSERT(msg);
-        return *msg;
-    }
-    
     void testReference() {
         Reference& ref = registry.open("foo");
-        ref.transfer(t1);
-        ref.transfer(t2);
-        CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getTransfers().size());
+        ref.addMessage(m1);
+        ref.addMessage(m2);
+        CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getMessages().size());
         ref.append(a1);
         ref.append(a2);
         CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getAppends().size());
@@ -96,16 +96,16 @@
             CPPUNIT_FAIL("Expected exception");
         } catch(...) {}
 
-        vector<Message::shared_ptr>& messages = handler.messages;
+        vector<MessageMessage::shared_ptr>& messages = handler.messages;
         CPPUNIT_ASSERT_EQUAL(size_t(2), messages.size());
 
-        CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getTransfer(), t1);
-        CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getAppends()[0], a1);
-        CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getAppends()[1], a2);
-
-        CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getTransfer(), t2);
-        CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getAppends()[0], a1);
-        CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getAppends()[1], a2);
+        CPPUNIT_ASSERT_EQUAL(messages[0], m1);
+        CPPUNIT_ASSERT_EQUAL(messages[0]->getAppends()[0], a1);
+        CPPUNIT_ASSERT_EQUAL(messages[0]->getAppends()[1], a2);
+
+        CPPUNIT_ASSERT_EQUAL(messages[1], m2);
+        CPPUNIT_ASSERT_EQUAL(messages[1]->getAppends()[0], a1);
+        CPPUNIT_ASSERT_EQUAL(messages[1]->getAppends()[1], a2);
     }