You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/02/06 16:01:52 UTC

svn commit: r504172 - in /incubator/qpid/branches/qpid.0-9/cpp: lib/broker/ lib/common/framing/ tests/

Author: aconway
Date: Tue Feb  6 07:01:45 2007
New Revision: 504172

URL: http://svn.apache.org/viewvc?view=rev&rev=504172
Log:
* broker/Reference, tests/ReferenceTest: class representing a reference.
* broker/BrokerChannel.cpp (complete): get destination exchange from Message,
  don't assume only one message in progress (could have multiple
  references open.)
* broker/BrokerMessageMessage.cpp,.h: Contains transfer body and
  vector of append bodies. Construct from Reference.
* broker/CompletionHandler.h: Extracted from BrokerMessage, used for
  MessageMessage also.
* broker/ExchangeRegistry.cpp: Moved throw for missing exchanges to
  registry. 
* cpp/tests/start_broker: Increased wait time to 5 secs.
* cpp/tests/*: renamed DummyChannel  as MockChannel.

Added:
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/CompletionHandler.h   (with props)
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp   (with props)
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h   (with props)
    incubator/qpid/branches/qpid.0-9/cpp/tests/MockChannel.h
      - copied, changed from r503923, incubator/qpid/branches/qpid.0-9/cpp/tests/DummyChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp   (with props)
Removed:
    incubator/qpid/branches/qpid.0-9/cpp/tests/DummyChannel.h
Modified:
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ExchangeRegistry.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h
    incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
    incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Tue Feb  6 07:01:45 2007
@@ -355,245 +355,5 @@
     assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
 }
 
-
-//
-// Message class method handlers
-//
-void
-BrokerAdapter::MessageHandlerImpl::append( u_int16_t /*channel*/,
-                                                const string& /*reference*/,
-                                                const string& /*bytes*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-
-void
-BrokerAdapter::MessageHandlerImpl::cancel( u_int16_t channel,
-                                                const string& destination )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-
-    connection.getChannel(channel).cancel(destination);
-
-    connection.client->getMessageHandler()->ok(channel);
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
-                                                    const string& /*reference*/,
-                                                    const string& /*identifier*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::close( u_int16_t /*channel*/,
-                                               const string& /*reference*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::consume( u_int16_t channelId,
-                    u_int16_t /*ticket*/,
-                    const string& queueName,
-                    const string& destination,
-                    bool noLocal,
-                    bool noAck,
-                    bool exclusive,
-                    const qpid::framing::FieldTable& filter )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-	
-    Queue::shared_ptr queue = connection.getQueue(queueName, channelId);    
-    Channel& channel = connection.getChannel(channelId);
-    if(!destination.empty() && channel.exists(destination)){
-        throw ConnectionException(530, "Consumer tags must be unique");
-    }
-
-    try{
-        string newTag = destination;
-        channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
-
-	    connection.client->getMessageHandler()->ok(channelId);
-
-        //allow messages to be dispatched if required as there is now a consumer:
-        queue->dispatch();
-    }catch(ExclusiveAccessException& e){
-        if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
-        else throw ChannelException(403, "Access would violate previously granted exclusivity");
-    }
-
-    connection.getChannel(channel).cancel(destination);
-
-    connection.client->getMessageHandler()->ok(channel);
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::get( u_int16_t channelId,
-                                             u_int16_t /*ticket*/,
-                                             const string& queueName,
-                                             const string& /*destination*/,
-                                             bool noAck )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-
-    Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
-    
-    // FIXME: get is probably Basic specific
-    if(!connection.getChannel(channelId).get(queue, !noAck)){
-
-        connection.client->getMessageHandler()->empty(channelId);
-    }
-    
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/,
-                                                u_int64_t /*value*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-	
-    Queue::shared_ptr queue = connection.getQueue(queueName, channelId);    
-    Channel& channel = connection.getChannel(channelId);
-    if(!destination.empty() && channel.exists(destination)){
-        throw ConnectionException(530, "Consumer tags must be unique");
-    }
-
-    try{
-        string newTag = destination;
-        channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
-
-	    connection.client->getMessageHandler()->ok(channelId);
-
-        //allow messages to be dispatched if required as there is now a consumer:
-        queue->dispatch();
-    }catch(ExclusiveAccessException& e){
-        if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
-        else throw ChannelException(403, "Access would violate previously granted exclusivity");
-    }
-
-    connection.getChannel(channel).cancel(destination);
-
-    connection.client->getMessageHandler()->ok(channel);
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::open( u_int16_t /*channel*/,
-                                              const string& /*reference*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::qos( u_int16_t channel,
-                    u_int32_t prefetchSize,
-                    u_int16_t prefetchCount,
-                    bool /*global*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-    
-    //TODO: handle global
-    connection.getChannel(channel).setPrefetchSize(prefetchSize);
-    connection.getChannel(channel).setPrefetchCount(prefetchCount);
-    
-    connection.client->getMessageHandler()->ok(channel);
-	
-    Queue::shared_ptr queue = connection.getQueue(queueName, channelId);    
-    Channel& channel = connection.getChannel(channelId);
-    if(!destination.empty() && channel.exists(destination)){
-        throw ConnectionException(530, "Consumer tags must be unique");
-    }
-
-    try{
-        string newTag = destination;
-        channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
-
-	    connection.client->getMessageHandler()->ok(channelId);
-
-        //allow messages to be dispatched if required as there is now a consumer:
-        queue->dispatch();
-    }catch(ExclusiveAccessException& e){
-        if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
-        else throw ChannelException(403, "Access would violate previously granted exclusivity");
-    }
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::recover( u_int16_t channel,
-                                                 bool requeue )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-
-    connection.getChannel(channel).recover(requeue);
-    
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::reject( u_int16_t /*channel*/,
-                                                u_int16_t /*code*/,
-                                                const string& /*text*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::resume( u_int16_t /*channel*/,
-                                                const string& /*reference*/,
-                                                const string& /*identifier*/ )
-{
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::transfer( u_int16_t channel,
-                    u_int16_t /*ticket*/,
-                    const string& /*destination*/,
-                    bool /*redelivered*/,
-                    bool immediate,
-                    u_int64_t /*ttl*/,
-                    u_int8_t /*priority*/,
-                    u_int64_t /*timestamp*/,
-                    u_int8_t /*deliveryMode*/,
-                    u_int64_t /*expiration*/,
-                    const string& exchangeName,
-                    const string& routingKey,
-                    const string& /*messageId*/,
-                    const string& /*correlationId*/,
-                    const string& /*replyTo*/,
-                    const string& /*contentType*/,
-                    const string& /*contentEncoding*/,
-                    const string& /*userId*/,
-                    const string& /*appId*/,
-                    const string& /*transactionId*/,
-                    const string& /*securityToken*/,
-                    const qpid::framing::FieldTable& /*applicationHeaders*/,
-                    qpid::framing::Content /*body*/ )
-{
-	assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-
-	Exchange::shared_ptr exchange = exchangeName.empty() ?
-		connection.broker.getExchanges().getDefault() : connection.broker.getExchanges().get(exchangeName);
-	if(exchange){
-	    Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate);
-	    connection.getChannel(channel).handlePublish(msg, exchange);
-	}else{
-	    throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
-	}
-}
-
 }} // namespace qpid::broker
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Tue Feb  6 07:01:45 2007
@@ -78,7 +78,7 @@
                       bool exclusive, ConnectionToken* const connection,
                       const FieldTable*)
 {
-	if(tag.empty()) tag = tagGenerator.generate();
+    if(tag.empty()) tag = tagGenerator.generate();
     ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
     try{
         queue->consume(c, exclusive);//may throw exception
@@ -187,6 +187,8 @@
     if(blocked) queue->dispatch();
 }
 
+// FIXME aconway 2007-02-05: Drop exchange member, calculate from
+// message in ::complete().
 void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){
     Message::shared_ptr message(_message);
     exchange = _exchange;
@@ -207,19 +209,19 @@
     // TODO aconway 2007-01-17: Implement heartbeating.
 }
 
-void Channel::complete(Message::shared_ptr& msg){
-    if(exchange){
-        if(transactional){
-            TxPublish* deliverable = new TxPublish(msg);
-            exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
-            txBuffer.enlist(new DeletingTxOp(deliverable));
-        }else{
-            DeliverableMessage deliverable(msg);
-            exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
-        }
-        exchange.reset();
-    }else{
-        std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl;
+void Channel::complete(Message::shared_ptr msg) {
+    Exchange::shared_ptr exchange =
+        connection.broker.getExchanges().get(msg->getExchange());
+    assert(exchange.get());
+    if(transactional) {
+        std::auto_ptr<TxPublish> deliverable(new TxPublish(msg));
+        exchange->route(*deliverable, msg->getRoutingKey(),
+                        &(msg->getHeaderProperties()->getHeaders()));
+        txBuffer.enlist(new DeletingTxOp(deliverable.release()));
+    } else {
+        DeliverableMessage deliverable(msg);
+        exchange->route(deliverable, msg->getRoutingKey(),
+                        &(msg->getHeaderProperties()->getHeaders()));
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Tue Feb  6 07:01:45 2007
@@ -36,6 +36,7 @@
 #include <Prefetch.h>
 #include <TxBuffer.h>
 #include "framing/ChannelAdapter.h"
+#include "CompletionHandler.h"
 
 namespace qpid {
 namespace broker {
@@ -51,9 +52,8 @@
  * Maintains state for an AMQP channel. Handles incoming and
  * outgoing messages for that channel.
  */
-class Channel :
-        public framing::ChannelAdapter,
-        private MessageBuilder::CompletionHandler
+class Channel : public framing::ChannelAdapter,
+                public CompletionHandler
 {
     class ConsumerImpl : public virtual Consumer
     {
@@ -96,7 +96,7 @@
 
     boost::scoped_ptr<BrokerAdapter> adapter;
 
-    virtual void complete(Message::shared_ptr& msg);
+    virtual void complete(Message::shared_ptr msg);
     void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);            
     void cancel(consumer_iterator consumer);
     bool checkPrefetch(Message::shared_ptr& msg);

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp Tue Feb  6 07:01:45 2007
@@ -18,25 +18,39 @@
  * under the License.
  *
  */
+#include <iostream>
 #include "BrokerMessageMessage.h"
+#include "MessageTransferBody.h"
+#include "MessageAppendBody.h"
+#include "Reference.h"
 
+using namespace std;
 using namespace qpid::broker;
 	
-MessageMessage::MessageMessage(
-    const qpid::framing::AMQMethodBody::shared_ptr _methodBody, 
-    const std::string& _exchange, const std::string& _routingKey, 
-    bool _mandatory, bool _immediate) :
-    Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody),
-    methodBody(_methodBody)
-{
-}
+MessageMessage::MessageMessage(TransferPtr transfer_)
+    : Message(transfer_->getExchange(), transfer_->getRoutingKey(),
+              transfer_->getMandatory(), transfer_->getImmediate(),
+              transfer_),
+      transfer(transfer_)
+{}
+
+MessageMessage::MessageMessage(TransferPtr transfer_, const Reference& ref)
+    : Message(transfer_->getExchange(), transfer_->getRoutingKey(),
+              transfer_->getMandatory(), transfer_->getImmediate(),
+              transfer_),
+      transfer(transfer_),
+      appends(ref.getAppends())
+{}
 
 void MessageMessage::deliver(
-    framing::ChannelAdapter& /*out*/, 
+    framing::ChannelAdapter& /*channel*/,
     const std::string& /*consumerTag*/, 
     u_int64_t /*deliveryTag*/, 
     u_int32_t /*framesize*/)
 {
+    // FIXME aconway 2007-02-05:
+    cout << "MessageMessage::deliver" << *transfer << " + " << appends.size()
+         << " appends." << endl;
 }
 
 void MessageMessage::sendGetOk(
@@ -45,49 +59,50 @@
     u_int64_t /*deliveryTag*/, 
     u_int32_t /*framesize*/)
 {
+    // FIXME aconway 2007-02-05: 
 }
 
 bool MessageMessage::isComplete()
 {
-	return true;
+    return true;               // FIXME aconway 2007-02-05: 
 }
 
 u_int64_t MessageMessage::contentSize() const
 {
-	return 0;
+    return 0;               // FIXME aconway 2007-02-05: 
 }
 
 qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
 {
-	return 0;
+    return 0;               // FIXME aconway 2007-02-05: 
 }
 bool MessageMessage::isPersistent()
 {
-	return false;
+    return false;               // FIXME aconway 2007-02-05: 
 }
 
 const ConnectionToken* const MessageMessage::getPublisher()
 {
-	return 0;
+    return 0;               // FIXME aconway 2007-02-05: 
 }
 
 u_int32_t MessageMessage::encodedSize()
 {
-	return 0;
+    return 0;               // FIXME aconway 2007-02-05: 
 }
 
 u_int32_t MessageMessage::encodedHeaderSize()
 {
-	return 0;
+    return 0;               // FIXME aconway 2007-02-05: 
 }
 
 u_int32_t MessageMessage::encodedContentSize()
 {
-	return 0;
+    return 0;               // FIXME aconway 2007-02-05: 
 }
 
 u_int64_t MessageMessage::expectedContentSize()
 {
-	return 0;
+    return 0;               // FIXME aconway 2007-02-05: 
 }
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Tue Feb  6 07:01:45 2007
@@ -21,23 +21,28 @@
  * under the License.
  *
  */
-
+#include <vector>
 #include "BrokerMessageBase.h"
+#include "Reference.h"         
 
 namespace qpid {
+
 namespace framing {
-class AMQMethodBody;
+class MessageTransferBody;
+class MessageApppendBody;
 }
 	
 namespace broker {
-class MessageMessage: public Message{
-    const qpid::framing::AMQMethodBody::shared_ptr methodBody;
+class Reference;
 
+class MessageMessage: public Message{
   public:
-    MessageMessage(
-        const framing::AMQMethodBody::shared_ptr methodBody, 
-        const std::string& exchange, const std::string& routingKey, 
-        bool mandatory, bool immediate);
+    typedef Reference::TransferPtr TransferPtr;
+    typedef Reference::AppendPtr AppendPtr;
+    typedef  Reference::Appends Appends;
+
+    MessageMessage(TransferPtr transfer);
+    MessageMessage(TransferPtr transfer, const Reference&);
             
     // Default destructor okay
 			            
@@ -52,7 +57,7 @@
                    u_int32_t framesize);
 
     bool isComplete();
-            
+
     u_int64_t contentSize() const;
     qpid::framing::BasicHeaderProperties* getHeaderProperties();
     bool isPersistent();
@@ -62,10 +67,16 @@
     u_int32_t encodedHeaderSize();
     u_int32_t encodedContentSize();
     u_int64_t expectedContentSize();
+
+    TransferPtr getTransfer() { return transfer; }
+    const Appends& getAppends() { return appends; }
+  private:
+
+    const TransferPtr transfer;
+    const Appends appends;
 };
 
-}
-}
+}}
 
 
 #endif  /*!_broker_BrokerMessage_h*/

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/CompletionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/CompletionHandler.h?view=auto&rev=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/CompletionHandler.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/CompletionHandler.h Tue Feb  6 07:01:45 2007
@@ -0,0 +1,39 @@
+#ifndef _broker_CompletionHandler_h
+#define _broker_CompletionHandler_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Callback interface to handle completion of a message.
+ */
+class CompletionHandler
+{
+  public:
+    virtual ~CompletionHandler(){}
+    virtual void complete(Message::shared_ptr) = 0;
+};
+
+}} // namespace qpid::broker
+
+
+
+#endif  /*!_broker_CompletionHandler_h*/

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/CompletionHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/CompletionHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ExchangeRegistry.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ExchangeRegistry.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ExchangeRegistry.cpp Tue Feb  6 07:01:45 2007
@@ -59,7 +59,10 @@
 
 Exchange::shared_ptr ExchangeRegistry::get(const string& name){
     Mutex::ScopedLock locker(lock);
-    return exchanges[name];
+    Exchange::shared_ptr exchange =exchanges[name];
+    if (!exchange) 
+        throw ChannelException(404, "Exchange not found:" + name);
+    return exchange;
 }
 
 namespace 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am Tue Feb  6 07:01:45 2007
@@ -69,6 +69,8 @@
   QueueRegistry.h				\
   RecoveryManager.cpp				\
   RecoveryManager.h				\
+  Reference.cpp					\
+  Reference.h					\
   ConnectionFactory.cpp				\
   ConnectionFactory.h				\
   Connection.cpp				\

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.cpp Tue Feb  6 07:01:45 2007
@@ -27,7 +27,10 @@
 using namespace qpid::framing;
 using std::auto_ptr;
 
-MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* const _store, u_int64_t _stagingThreshold) : 
+MessageBuilder::MessageBuilder(CompletionHandler* _handler,
+                               MessageStore* const _store,
+                               u_int64_t _stagingThreshold
+) : 
     handler(_handler),
     store(_store),
     stagingThreshold(_stagingThreshold)

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h Tue Feb  6 07:01:45 2007
@@ -29,22 +29,19 @@
 #include <AMQContentBody.h>
 #include <AMQHeaderBody.h>
 #include <BasicPublishBody.h>
+#include "CompletionHandler.h"
 
 namespace qpid {
     namespace broker {
         class MessageBuilder{
         public:
-            class CompletionHandler{
-            public:
-                virtual void complete(Message::shared_ptr&) = 0;
-                virtual ~CompletionHandler(){}
-            };
             MessageBuilder(CompletionHandler* _handler,
                            MessageStore* const store = 0,
                            u_int64_t stagingThreshold = 0);
             void initialise(Message::shared_ptr& msg);
-            void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header);
-            void addContent(qpid::framing::AMQContentBody::shared_ptr& content);
+            void setHeader(framing::AMQHeaderBody::shared_ptr& header);
+            void addContent(framing::AMQContentBody::shared_ptr& content);
+            Message::shared_ptr getMessage() { return message; }
         private:
             Message::shared_ptr message;
             CompletionHandler* handler;

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Tue Feb  6 07:01:45 2007
@@ -23,6 +23,8 @@
 #include "Connection.h"
 #include "Broker.h"
 #include "BrokerMessageMessage.h"
+#include "MessageAppendBody.h"
+#include "MessageTransferBody.h"
 
 namespace qpid {
 namespace broker {
@@ -33,23 +35,23 @@
 // Message class method handlers
 //
 void
-MessageHandlerImpl::append(const MethodContext&,
-                           const string& /*reference*/,
+MessageHandlerImpl::append(const MethodContext& context,
+                           const string& reference,
                            const string& /*bytes*/ )
 {
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    references.get(reference).append(
+        boost::shared_polymorphic_downcast<MessageAppendBody>(
+            context.methodBody));
+    sendOk(context);
 }
 
 
 void
-MessageHandlerImpl::cancel( const MethodContext& context,
-                            const string& destination )
+MessageHandlerImpl::cancel(const MethodContext& context,
+                           const string& destination )
 {
-    //assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-
     channel.cancel(destination);
-
-    connection.client->getMessageHandler()->ok(context);
+    sendOk(context);
 }
 
 void
@@ -61,10 +63,11 @@
 }
 
 void
-MessageHandlerImpl::close(const MethodContext&,
-                          const string& /*reference*/ )
+MessageHandlerImpl::close(const MethodContext& context,
+                          const string& reference)
 {
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    references.get(reference).close();
+    sendOk(context);
 }
 
 void
@@ -88,13 +91,16 @@
         string newTag = destination;
         channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
 
-        connection.client->getMessageHandler()->ok(context);
+        sendOk(context);
 
         //allow messages to be dispatched if required as there is now a consumer:
         queue->dispatch();
     }catch(ExclusiveAccessException& e){
-        if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
-        else throw ChannelException(403, "Access would violate previously granted exclusivity");
+        if(exclusive)
+            throw ChannelException(403, "Exclusive access cannot be granted");
+        else
+            throw ChannelException(
+                403, "Access would violate previously granted exclusivity");
     }
 }
 
@@ -133,14 +139,15 @@
 void
 MessageHandlerImpl::ok( const MethodContext& )
 {
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    // TODO aconway 2007-02-05: For HA, we can drop acked messages here.
 }
 
 void
-MessageHandlerImpl::open(const MethodContext&,
-                         const string& /*reference*/ )
+MessageHandlerImpl::open(const MethodContext& context,
+                         const string& reference)
 {
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    references.open(reference);
+    sendOk(context);
 }
 
 void
@@ -155,7 +162,7 @@
     channel.setPrefetchSize(prefetchSize);
     channel.setPrefetchCount(prefetchCount);
     
-    connection.client->getMessageHandler()->ok(context);
+    sendOk(context);
 }
 
 void
@@ -189,14 +196,14 @@
                              u_int16_t /*ticket*/,
                              const string& /*destination*/,
                              bool /*redelivered*/,
-                             bool immediate,
+                             bool /* immediate */,
                              u_int64_t /*ttl*/,
                              u_int8_t /*priority*/,
                              u_int64_t /*timestamp*/,
                              u_int8_t /*deliveryMode*/,
                              u_int64_t /*expiration*/,
                              const string& exchangeName,
-                             const string& routingKey,
+                             const string& /* routingKey */,
                              const string& /*messageId*/,
                              const string& /*correlationId*/,
                              const string& /*replyTo*/,
@@ -208,27 +215,28 @@
                              const string& /*securityToken*/,
                              const qpid::framing::FieldTable& /*applicationHeaders*/,
                              qpid::framing::Content body,
-                             bool mandatory )
+                             bool /* mandatory */ )
 {
     //assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-
-    Exchange::shared_ptr exchange = exchangeName.empty() ?
-        broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
-    if(exchange){
-    	if (body.isInline()) {
-            MessageMessage* msg =
-                new MessageMessage(context.methodBody, exchangeName,
-                                   routingKey, mandatory, immediate);
-            channel.handlePublish(msg, exchange);
-        
-            connection.client->getMessageHandler()->ok(context);
-    	} else {
-            // Don't handle reference content yet
-            assert(body.isInline());
-    	}
-    }else{
-        throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+    MessageTransferBody::shared_ptr transfer =
+        boost::shared_polymorphic_downcast<MessageTransferBody>(
+            context.methodBody);
+    // Verify the exchange exists, will throw if not.
+    broker.getExchanges().get(exchangeName);
+    if (body.isInline()) {
+        MessageMessage* msg = new MessageMessage(transfer);
+        // FIXME aconway 2007-02-05: Remove exchange parameter.
+        // use shared_ptr for message.
+        channel.handlePublish(msg, Exchange::shared_ptr());
+        sendOk(context);
+    } else {
+        references.get(body.getValue()).transfer(transfer);
     }
+}
+
+
+void MessageHandlerImpl::sendOk(const MethodContext& context) {
+    connection.client->getMessageHandler()->ok(context);
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h Tue Feb  6 07:01:45 2007
@@ -19,23 +19,25 @@
  *
  */
 
+#include <memory>
+
 #include "AMQP_ServerOperations.h"
+#include "Reference.h"
+#include "BrokerChannel.h"
 
 namespace qpid {
 namespace broker {
 
-class Channel;
 class Connection;
 class Broker;
+class MessageMessage;
 
-class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageHandler {
-    Channel& channel;
-    Connection& connection;
-    Broker& broker;
-
+class MessageHandlerImpl :
+        public framing::AMQP_ServerOperations::MessageHandler
+{
   public:
     MessageHandlerImpl(Channel& ch, Connection& c, Broker& b)
-        : channel(ch), connection(c), broker(b) {}
+        : channel(ch), connection(c), broker(b), references(ch) {}
 
     void append(const framing::MethodContext&,
                  const std::string& reference,
@@ -116,6 +118,13 @@
                    const framing::FieldTable& applicationHeaders,
                    framing::Content body,
                    bool mandatory );
+  private:
+    void sendOk(const framing::MethodContext&);
+    
+    Channel& channel;
+    Connection& connection;
+    Broker& broker;
+    ReferenceRegistry references;
 };
 
 }} // namespace qpid::broker

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp?view=auto&rev=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp Tue Feb  6 07:01:45 2007
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/bind.hpp>
+#include "Reference.h"
+#include "BrokerMessageMessage.h"
+#include "QpidError.h"
+#include "CompletionHandler.h"
+
+namespace qpid {
+namespace broker {
+
+Reference&  ReferenceRegistry::open(const Reference::Id& id) {
+    ReferenceMap::iterator i = references.find(id);
+    // TODO aconway 2007-02-05: should we throw Channel or Connection
+    // exceptions here?
+    if (i != references.end())
+        THROW_QPID_ERROR(CLIENT_ERROR, "Attempt to re-open reference " +id);
+    return references[id] = Reference(id, this);
+}
+
+Reference&  ReferenceRegistry::get(const Reference::Id& id) {
+    ReferenceMap::iterator i = references.find(id);
+    if (i == references.end()) 
+        THROW_QPID_ERROR(
+            CLIENT_ERROR, "Attempt to use non-existent reference "+id);
+    return i->second;
+}
+
+void  Reference::close() {
+    for_each(transfers.begin(), transfers.end(),
+             boost::bind(&Reference::complete, this, _1));
+    registry->references.erase(getId());
+}
+
+void Reference::complete(TransferPtr transfer) {
+    MessageMessage::shared_ptr msg(new MessageMessage(transfer, *this));
+    registry->handler.complete(msg);
+}
+
+}} // namespace qpid::broker

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h?view=auto&rev=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h Tue Feb  6 07:01:45 2007
@@ -0,0 +1,111 @@
+#ifndef _broker_Reference_h
+#define _broker_Reference_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <string>
+#include <vector>
+#include <map>
+#include <boost/shared_ptr.hpp>
+#include <boost/range.hpp>
+
+namespace qpid {
+
+namespace framing {
+class MessageTransferBody;
+class MessageAppendBody;
+}
+
+namespace broker {
+
+class CompletionHandler;
+class ReferenceRegistry;
+
+/**
+ * A reference is an accumulation point for data in a multi-frame
+ * message. A reference can be used by multiple transfer commands, so
+ * the reference tracks which commands are using it. When the reference
+ * is closed, all the associated transfers are completed.
+ *
+ * THREAD UNSAFE: per-channel resource, access to channels is
+ * serialized.
+ */
+class Reference
+{
+  public:
+    typedef std::string Id;
+    typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
+    typedef std::vector<TransferPtr> Transfers;
+    typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr;
+    typedef std::vector<AppendPtr> Appends;
+
+    Reference(const Id& id_=Id(), ReferenceRegistry* reg=0)
+        : id(id_), registry(reg) {}
+    
+    const std::string& getId() const { return id; }
+
+    /** Add a transfer to be completed with this reference */
+    void transfer(TransferPtr transfer) { transfers.push_back(transfer); }
+
+    /** Append more data to the reference */
+    void append(AppendPtr ptr) { appends.push_back(ptr); }
+
+    /** Close the reference, complete each associated transfer */
+    void close();
+
+    const Appends& getAppends() const { return appends; }
+    const Transfers& getTransfers() const { return transfers; }
+    
+  private:
+    void complete(TransferPtr transfer);
+    
+    Id id;
+    ReferenceRegistry* registry;
+    Transfers transfers;
+    Appends appends;
+};
+
+
+/**
+ * A registry/factory for references.
+ * 
+ * THREAD UNSAFE: per-channel resource, access to channels is
+ * serialized.
+ */
+class ReferenceRegistry {
+  public:
+    ReferenceRegistry(CompletionHandler& handler_) : handler(handler_) {};
+    Reference& open(const Reference::Id& id);
+    Reference& get(const Reference::Id& id);
+
+  private:
+    typedef std::map<Reference::Id, Reference> ReferenceMap;
+    CompletionHandler& handler;
+    ReferenceMap references;
+
+    // Reference calls references.erase() and uses handler.
+  friend class Reference;
+};
+
+
+}} // namespace qpid::broker
+
+
+
+#endif  /*!_broker_Reference_h*/

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h Tue Feb  6 07:01:45 2007
@@ -64,7 +64,10 @@
 };
 
 // FIXME aconway 2007-02-01: Method context only required on Handler
-// functions, not on Proxy functions.
+// functions, not on Proxy functions. If we add set/getChannel(ChannelAdapter*)
+// on AMQBody and set it during decodeing then we could get rid of the context.
+
+
 
 }} // namespace qpid::framing
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp Tue Feb  6 07:01:45 2007
@@ -28,7 +28,7 @@
 #include <memory>
 #include <AMQP_HighestVersion.h>
 #include "AMQFrame.h"
-#include "DummyChannel.h"
+#include "MockChannel.h"
 #include "broker/Connection.h"
 #include "ProtocolInitiation.h"
 
@@ -39,7 +39,7 @@
 using std::string;
 using std::queue;
 
-struct DummyHandler : ConnectionOutputHandler{
+struct MockHandler : ConnectionOutputHandler{
     std::vector<AMQFrame*> frames; 
 
     void send(AMQFrame* frame){ frames.push_back(frame); }
@@ -60,7 +60,7 @@
 
     Broker::shared_ptr broker;
     Connection connection;
-    DummyHandler handler;
+    MockHandler handler;
     
     class MockMessageStore : public NullMessageStore
     {
@@ -240,10 +240,10 @@
         Channel channel(
             connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/);
         const string data[] = {"abcde", "fghij", "klmno"};
-
+        
         Message* msg = new BasicMessage(
             0, "my_exchange", "my_routing_key", false, false,
-            DummyChannel::basicGetBody());
+            MockChannel::basicGetBody());
 
         store.expect();
         store.stage(msg);
@@ -253,7 +253,8 @@
         store.destroy(msg);
         store.test();
 
-        Exchange::shared_ptr exchange(new FanOutExchange("my_exchange"));
+        Exchange::shared_ptr exchange  =
+            broker->getExchanges().declare("my_exchange", "fanout").first;
         Queue::shared_ptr queue(new Queue("my_queue"));
         exchange->bind(queue, "", 0);
 
@@ -333,7 +334,7 @@
     {
         BasicMessage* msg = new BasicMessage(
             0, exchange, routingKey, false, false,
-            DummyChannel::basicGetBody());
+            MockChannel::basicGetBody());
         AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
         header->setContentSize(contentSize);        
         msg->setHeader(header);

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp Tue Feb  6 07:01:45 2007
@@ -24,7 +24,7 @@
 #include <iostream>
 #include <list>
 #include "AMQFrame.h"
-#include "DummyChannel.h"
+#include "MockChannel.h"
 
 using std::list;
 using std::string;
@@ -58,7 +58,7 @@
     void refragment(size_t inCount, string* in, size_t outCount, string* out, u_int32_t framesize = 5)
     {
         InMemoryContent content;
-        DummyChannel channel(3);
+        MockChannel channel(3);
 
         addframes(content, inCount, in);
         content.send(channel, framesize);         

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp Tue Feb  6 07:01:45 2007
@@ -26,7 +26,7 @@
 #include <list>
 #include <sstream>
 #include "AMQFrame.h"
-#include "DummyChannel.h"
+#include "MockChannel.h"
 using std::list;
 using std::string;
 using boost::dynamic_pointer_cast;
@@ -92,7 +92,7 @@
     {
         TestMessageStore store(in);
         LazyLoadedContent content(&store, 0, in.size());
-        DummyChannel channel(3);
+        MockChannel channel(3);
         content.send(channel, framesize);         
         CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size());
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am Tue Feb  6 07:01:45 2007
@@ -9,12 +9,6 @@
   -I$(top_srcdir)/lib/common/framing	\
   $(APR_CXXFLAGS)
 
-EXTRA_DIST =		\
-  topictest		\
-  qpid_test_plugin.h	\
-  MockConnectionInputHandler.h
-
-
 client_exe_tests =	\
   client_test		\
   echo_service		\
@@ -32,6 +26,7 @@
   MessageBuilderTest	\
   MessageHandlerTest	\
   MessageTest		\
+  ReferenceTest         \
   QueueRegistryTest	\
   QueueTest		\
   QueuePolicyTest	\
@@ -69,7 +64,14 @@
 
 CLIENT_TESTS = client_test quick_topictest
 TESTS = run-unit-tests start_broker $(CLIENT_TESTS) python_tests kill_broker
-EXTRA_DIST += $(TESTS) topictest
+
+EXTRA_DIST =		\
+  $(TESTS) 		\
+  topictest		\
+  qpid_test_plugin.h	\
+  MockConnectionInputHandler.h \
+  MockChannel.h		\
+  InProcessBroker.h
 
 include gen.mk
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp Tue Feb  6 07:01:45 2007
@@ -26,7 +26,7 @@
 #include <qpid_test_plugin.h>
 #include <iostream>
 #include <memory>
-#include "DummyChannel.h"
+#include "MockChannel.h"
 
 using namespace boost;
 using namespace qpid::broker;
@@ -35,10 +35,10 @@
 
 class MessageBuilderTest : public CppUnit::TestCase  
 {
-    struct DummyHandler : MessageBuilder::CompletionHandler{
+    struct MockHandler : CompletionHandler {
         Message::shared_ptr msg;
 
-        virtual void complete(Message::shared_ptr& _msg){
+        virtual void complete(Message::shared_ptr _msg){
             msg = _msg;
         }
     };
@@ -114,13 +114,13 @@
   public:
 
     void testHeaderOnly(){
-        DummyHandler handler;
+        MockHandler handler;
         MessageBuilder builder(&handler);
 
         Message::shared_ptr message(
             new BasicMessage(
                 0, "test", "my_routing_key", false, false,
-                DummyChannel::basicGetBody()));
+                MockChannel::basicGetBody()));
         AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
         header->setContentSize(0);
         
@@ -132,14 +132,14 @@
     }
 
     void test1ContentFrame(){
-        DummyHandler handler;
+        MockHandler handler;
         MessageBuilder builder(&handler);
 
         string data1("abcdefg");
 
         Message::shared_ptr message(
             new BasicMessage(0, "test", "my_routing_key", false, false,
-                             DummyChannel::basicGetBody()));
+                             MockChannel::basicGetBody()));
         AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
         header->setContentSize(7);
         AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -154,7 +154,7 @@
     }
 
     void test2ContentFrames(){
-        DummyHandler handler;
+        MockHandler handler;
         MessageBuilder builder(&handler);
 
         string data1("abcdefg");
@@ -162,7 +162,7 @@
 
         Message::shared_ptr message(
             new BasicMessage(0, "test", "my_routing_key", false, false,
-                             DummyChannel::basicGetBody()));
+                             MockChannel::basicGetBody()));
         AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
         header->setContentSize(14);
         AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -185,7 +185,7 @@
         //loaded content is in use)
         TestMessageStore store(14);
         {
-            DummyHandler handler;
+            MockHandler handler;
             MessageBuilder builder(&handler, &store, 5);
             
             string data1("abcdefg");
@@ -193,7 +193,7 @@
             
             Message::shared_ptr message(
                 new BasicMessage(0, "test", "my_routing_key", false, false,
-                                 DummyChannel::basicGetBody()));
+                                 MockChannel::basicGetBody()));
             AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
             header->setContentSize(14);
             BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties());

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp Tue Feb  6 07:01:45 2007
@@ -23,7 +23,7 @@
 #include <iostream>
 #include <AMQP_HighestVersion.h>
 #include "AMQFrame.h"
-#include "DummyChannel.h"
+#include "MockChannel.h"
 
 using namespace boost;
 using namespace qpid::broker;
@@ -47,7 +47,7 @@
 
         BasicMessage::shared_ptr msg(
             new BasicMessage(0, exchange, routingKey, false, false,
-                             DummyChannel::basicGetBody()));
+                             MockChannel::basicGetBody()));
         AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
         header->setContentSize(14);        
         AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -73,7 +73,7 @@
         CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc"));
         CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize());
 
-        DummyChannel channel(1);
+        MockChannel channel(1);
         // FIXME aconway 2007-02-02: deliver should take const ProtocolVersion&
         msg->deliver(channel, "ignore", 0, 100); 
         CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size());

Copied: incubator/qpid/branches/qpid.0-9/cpp/tests/MockChannel.h (from r503923, incubator/qpid/branches/qpid.0-9/cpp/tests/DummyChannel.h)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/MockChannel.h?view=diff&rev=504172&p1=incubator/qpid/branches/qpid.0-9/cpp/tests/DummyChannel.h&r1=503923&p2=incubator/qpid/branches/qpid.0-9/cpp/tests/MockChannel.h&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/DummyChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/MockChannel.h Tue Feb  6 07:01:45 2007
@@ -1,5 +1,5 @@
-#ifndef _tests_DummyChannel_h
-#define _tests_DummyChannel_h
+#ifndef _tests_MockChannel_h
+#define _tests_MockChannel_h
 
 /*
  *
@@ -26,16 +26,16 @@
 #include "framing/AMQFrame.h"
 #include "BasicGetBody.h"
 
-/** Dummy output handler to collect frames */
-struct DummyOutputHandler : public qpid::framing::OutputHandler {
+/** Mock output handler to collect frames */
+struct MockOutputHandler : public qpid::framing::OutputHandler {
     std::vector<qpid::framing::AMQFrame*> frames;
     void send(qpid::framing::AMQFrame* frame){ frames.push_back(frame); }
 };
 
 /**
- * Combination dummy OutputHandler and ChannelAdapter for tests.
+ * Combination mock OutputHandler and ChannelAdapter for tests.
  */
-struct DummyChannel : public qpid::framing::ChannelAdapter
+struct MockChannel : public qpid::framing::ChannelAdapter
 {
     typedef qpid::framing::BasicGetBody Body;
     static Body::shared_ptr basicGetBody() {
@@ -43,9 +43,9 @@
             new Body(qpid::framing::ProtocolVersion()));
     }
 
-    DummyOutputHandler out;
+    MockOutputHandler out;
 
-    DummyChannel(qpid::framing::ChannelId id) {
+    MockChannel(qpid::framing::ChannelId id) {
         init(id, out, qpid::framing::ProtocolVersion());
     }
 
@@ -66,4 +66,4 @@
 
 };
 
-#endif  /*!_tests_DummyChannel_h*/
+#endif  /*!_tests_MockChannel_h*/

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp Tue Feb  6 07:01:45 2007
@@ -22,7 +22,7 @@
 #include <QueueRegistry.h>
 #include <qpid_test_plugin.h>
 #include <iostream>
-#include "DummyChannel.h"
+#include "MockChannel.h"
 
 using namespace qpid::broker;
 using namespace qpid::sys;
@@ -58,7 +58,7 @@
     Message::shared_ptr message(std::string exchange, std::string routingKey) {
         return Message::shared_ptr(
             new BasicMessage(0, exchange, routingKey, true, true,
-                             DummyChannel::basicGetBody()));
+                             MockChannel::basicGetBody()));
     }
     
     void testConsumers(){

Added: incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp?view=auto&rev=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp Tue Feb  6 07:01:45 2007
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <memory>
+#include "qpid_test_plugin.h"
+#include "Reference.h"
+#include "BrokerMessageMessage.h"
+#include "MessageTransferBody.h"
+#include "MessageAppendBody.h"
+#include "CompletionHandler.h"
+
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace std;
+
+class ReferenceTest : public CppUnit::TestCase  
+{
+    CPPUNIT_TEST_SUITE(ReferenceTest);
+    CPPUNIT_TEST(testRegistry);
+    CPPUNIT_TEST(testReference);
+    CPPUNIT_TEST_SUITE_END();
+
+
+    struct MockCompletionHandler : public CompletionHandler {
+        std::vector<Message::shared_ptr> messages;
+        void complete(Message::shared_ptr msg) { messages.push_back(msg); }
+    };
+
+    MockCompletionHandler handler;
+    ProtocolVersion v;
+    ReferenceRegistry registry;
+    MessageTransferBody::shared_ptr t1, t2;
+    MessageAppendBody::shared_ptr a1, a2;
+  public:
+
+    ReferenceTest() :
+        registry(handler),
+        t1(new MessageTransferBody(v)),
+        t2(new MessageTransferBody(v)),
+        a1(new MessageAppendBody(v)),
+        a2(new MessageAppendBody(v))
+    {}
+
+    void testRegistry() {
+        Reference& ref = registry.open("foo");
+        CPPUNIT_ASSERT_EQUAL(string("foo"), ref.getId());
+        CPPUNIT_ASSERT(&ref == &registry.get("foo"));
+        try {
+            registry.get("none");
+            CPPUNIT_FAIL("Expected exception");
+        } catch (...) {}
+        try {
+            registry.open("foo");
+            CPPUNIT_FAIL("Expected exception");
+        } catch(...) {}
+    }
+
+    MessageMessage& handlerMessage(size_t i) {
+        CPPUNIT_ASSERT(handler.messages.size() > i);
+        MessageMessage* msg = dynamic_cast<MessageMessage*>(
+            handler.messages[i].get());
+        CPPUNIT_ASSERT(msg);
+        return *msg;
+    }
+    
+    void testReference() {
+        Reference& ref = registry.open("foo");
+        ref.transfer(t1);
+        ref.transfer(t2);
+        CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getTransfers().size());
+        ref.append(a1);
+        ref.append(a2);
+        CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getAppends().size());
+        ref.close();
+        try {
+            registry.open("foo");
+            CPPUNIT_FAIL("Expected exception");
+        } catch(...) {}
+
+        vector<Message::shared_ptr>& messages = handler.messages;
+        CPPUNIT_ASSERT_EQUAL(size_t(2), messages.size());
+
+        CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getTransfer(), t1);
+        CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getAppends()[0], a1);
+        CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getAppends()[1], a2);
+
+        CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getTransfer(), t2);
+        CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getAppends()[0], a1);
+        CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getAppends()[1], a2);
+    }
+                             
+    
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(ReferenceTest);

Propchange: incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp Tue Feb  6 07:01:45 2007
@@ -25,7 +25,7 @@
 #include <iostream>
 #include <list>
 #include <vector>
-#include "DummyChannel.h"
+#include "MockChannel.h"
 
 using std::list;
 using std::vector;
@@ -72,7 +72,7 @@
         for(int i = 0; i < 10; i++){
             Message::shared_ptr msg(
                 new BasicMessage(0, "exchange", "routing_key", false, false,
-                                 DummyChannel::basicGetBody()));
+                                 MockChannel::basicGetBody()));
             msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
             msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
             messages.push_back(msg);

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp Tue Feb  6 07:01:45 2007
@@ -25,7 +25,7 @@
 #include <iostream>
 #include <list>
 #include <vector>
-#include "DummyChannel.h"
+#include "MockChannel.h"
 
 using std::list;
 using std::pair;
@@ -78,7 +78,7 @@
         queue1(new Queue("queue1", false, &store, 0)), 
         queue2(new Queue("queue2", false, &store, 0)), 
         msg(new BasicMessage(0, "exchange", "routing_key", false, false,
-                             DummyChannel::basicGetBody())),
+                             MockChannel::basicGetBody())),
         op(msg, &xid)
     {
         msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker Tue Feb  6 07:01:45 2007
@@ -11,4 +11,4 @@
 
 # FIXME aconway 2007-01-18: qpidd should not return till it is accepting
 # connections, remove arbitrary sleep.
-sleep 2
+sleep 5