You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/03/21 20:12:15 UTC

svn commit: r520972 - in /incubator/qpid/trunk/qpid/cpp: lib/client/ lib/common/ lib/common/framing/ tests/

Author: aconway
Date: Wed Mar 21 12:12:14 2007
New Revision: 520972

URL: http://svn.apache.org/viewvc?view=rev&rev=520972
Log:
Refactored client side for dual-mode Channel supporting either 0-9 Message or 0-8 Basic.

Added:
    incubator/qpid/trunk/qpid/cpp/lib/client/AckMode.h   (with props)
    incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.h   (with props)
    incubator/qpid/trunk/qpid/cpp/lib/client/MessageChannel.h   (with props)
    incubator/qpid/trunk/qpid/cpp/lib/common/shared_ptr.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp
    incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h
    incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp
    incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h
    incubator/qpid/trunk/qpid/cpp/lib/client/Makefile.am
    incubator/qpid/trunk/qpid/cpp/lib/common/Makefile.am
    incubator/qpid/trunk/qpid/cpp/lib/common/framing/FramingContent.h
    incubator/qpid/trunk/qpid/cpp/tests/ClientChannelTest.cpp
    incubator/qpid/trunk/qpid/cpp/tests/InProcessBroker.h
    incubator/qpid/trunk/qpid/cpp/tests/ProducerConsumerTest.cpp
    incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp
    incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp
    incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp
    incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp

Added: incubator/qpid/trunk/qpid/cpp/lib/client/AckMode.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/AckMode.h?view=auto&rev=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/AckMode.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/AckMode.h Wed Mar 21 12:12:14 2007
@@ -0,0 +1,102 @@
+#ifndef _client_AckMode_h
+#define _client_AckMode_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace qpid {
+namespace client {
+
+/**
+ * The available acknowledgements modes.
+ * 
+ * \ingroup clientapi
+ */
+enum AckMode {
+    /** No acknowledgement will be sent, broker can
+        discard messages as soon as they are delivered
+        to a consumer using this mode. **/
+    NO_ACK     = 0,  
+    /** Each message will be automatically
+        acknowledged as soon as it is delivered to the
+        application **/  
+    AUTO_ACK   = 1,  
+    /** Acknowledgements will be sent automatically,
+        but not for each message. **/
+    LAZY_ACK   = 2,
+    /** The application is responsible for explicitly
+        acknowledging messages. **/  
+    CLIENT_ACK = 3 
+};
+
+}} // namespace qpid::client
+
+
+
+#endif  /*!_client_AckMode_h*/
+#ifndef _client_AckMode_h
+#define _client_AckMode_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace qpid {
+namespace client {
+
+/**
+ * The available acknowledgements modes.
+ * 
+ * \ingroup clientapi
+ */
+enum AckMode {
+    /** No acknowledgement will be sent, broker can
+        discard messages as soon as they are delivered
+        to a consumer using this mode. **/
+    NO_ACK     = 0,  
+    /** Each message will be automatically
+        acknowledged as soon as it is delivered to the
+        application **/  
+    AUTO_ACK   = 1,  
+    /** Acknowledgements will be sent automatically,
+        but not for each message. **/
+    LAZY_ACK   = 2,
+    /** The application is responsible for explicitly
+        acknowledging messages. **/  
+    CLIENT_ACK = 3 
+};
+
+}} // namespace qpid::client
+
+
+
+#endif  /*!_client_AckMode_h*/

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/AckMode.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/AckMode.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.cpp?view=auto&rev=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.cpp Wed Mar 21 12:12:14 2007
@@ -0,0 +1,261 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "BasicMessageChannel.h"
+#include "AMQMethodBody.h"
+#include "ClientChannel.h"
+#include "ReturnedMessageHandler.h"
+#include "MessageListener.h"
+#include "framing/FieldTable.h"
+#include "Connection.h"
+
+using namespace std;
+
+namespace qpid {
+namespace client {
+
+using namespace sys;
+using namespace framing;
+
+BasicMessageChannel::BasicMessageChannel(Channel& ch)
+    : channel(ch), returnsHandler(0) {}
+
+void BasicMessageChannel::consume(
+    Queue& queue, std::string& tag, MessageListener* listener, 
+    AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields)
+{
+    channel.sendAndReceiveSync<BasicConsumeOkBody>(
+        synch,
+        new BasicConsumeBody(
+            channel.version, 0, queue.getName(), tag, noLocal,
+            ackMode == NO_ACK, false, !synch,
+            fields ? *fields : FieldTable()));
+    if (synch) {
+        BasicConsumeOkBody::shared_ptr response =
+            boost::shared_polymorphic_downcast<BasicConsumeOkBody>(
+                channel.responses.getResponse());
+        tag = response->getConsumerTag();
+    }
+    // FIXME aconway 2007-02-20: Race condition!
+    // We could receive the first message for the consumer
+    // before we create the consumer below.
+    // Move consumer creation to handler for BasicConsumeOkBody
+    {
+        Mutex::ScopedLock l(lock);
+        ConsumerMap::iterator i = consumers.find(tag);
+        if (i != consumers.end())
+            THROW_QPID_ERROR(CLIENT_ERROR,
+                             "Consumer already exists with tag="+tag);
+        Consumer& c = consumers[tag];
+        c.listener = listener;
+        c.ackMode = ackMode;
+        c.lastDeliveryTag = 0;
+    }
+}
+
+
+void BasicMessageChannel::cancel(const std::string& tag, bool synch) {
+    Consumer c;
+    {
+        Mutex::ScopedLock l(lock);
+        ConsumerMap::iterator i = consumers.find(tag);
+        if (i == consumers.end())
+            return;
+        c = i->second;
+        consumers.erase(i);
+    }
+    if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) 
+        channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+    channel.sendAndReceiveSync<BasicCancelOkBody>(
+        synch, new BasicCancelBody(channel.version, tag, !synch));
+}
+
+void BasicMessageChannel::close(){
+    ConsumerMap consumersCopy;
+    {
+        Mutex::ScopedLock l(lock);
+        consumersCopy = consumers;
+        consumers.clear();
+    }
+    for (ConsumerMap::iterator i=consumersCopy.begin();
+         i  != consumersCopy.end(); ++i)
+    {
+        Consumer& c = i->second;
+        if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
+            && c.lastDeliveryTag > 0)
+        {
+            channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+        }
+    }
+    incoming.shutdown();
+}
+
+
+
+bool BasicMessageChannel::get(Message& msg, const Queue& queue, AckMode ackMode) {
+    // Expect a message starting with a BasicGetOk
+    incoming.startGet();
+    channel.send(new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
+    return incoming.waitGet(msg);
+}
+
+void BasicMessageChannel::publish(
+    const Message& msg, const Exchange& exchange,
+    const std::string& routingKey, bool mandatory, bool immediate)
+{
+    msg.getHeader()->setContentSize(msg.getData().size());
+    const string e = exchange.getName();
+    string key = routingKey;
+    channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate));
+    //break msg up into header frame and content frame(s) and send these
+    channel.send(msg.getHeader());
+    string data = msg.getData();
+    u_int64_t data_length = data.length();
+    if(data_length > 0){
+        //frame itself uses 8 bytes
+        u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8;
+        if(data_length < frag_size){
+            channel.send(new AMQContentBody(data));
+        }else{
+            u_int32_t offset = 0;
+            u_int32_t remaining = data_length - offset;
+            while (remaining > 0) {
+                u_int32_t length = remaining > frag_size ? frag_size : remaining;
+                string frag(data.substr(offset, length));
+                channel.send(new AMQContentBody(frag));                          
+                
+                offset += length;
+                remaining = data_length - offset;
+            }
+        }
+    }
+}
+
+void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
+    assert(method->amqpClassId() ==BasicGetBody::CLASS_ID);
+    switch(method->amqpMethodId()) {
+      case BasicDeliverBody::METHOD_ID:
+      case BasicReturnBody::METHOD_ID:
+      case BasicGetOkBody::METHOD_ID:
+      case BasicGetEmptyBody::METHOD_ID:
+        incoming.add(method);   
+        return;
+    }
+    throw Channel::UnknownMethod();
+}
+
+void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr body){
+    incoming.add(body);
+}
+    
+void BasicMessageChannel::handle(AMQContentBody::shared_ptr body){
+    incoming.add(body);
+}
+
+void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){
+    //record delivery tag:
+    consumer.lastDeliveryTag = msg.getDeliveryTag();
+
+    //allow registered listener to handle the message
+    consumer.listener->received(msg);
+
+    if(channel.isOpen()){
+        bool multiple(false);
+        switch(consumer.ackMode){
+          case LAZY_ACK: 
+            multiple = true;
+            if(++(consumer.count) < channel.getPrefetch())
+                break;
+            //else drop-through
+          case AUTO_ACK:
+            consumer.lastDeliveryTag = 0;
+            channel.send(
+                new BasicAckBody(
+                    channel.version, msg.getDeliveryTag(), multiple));
+          case NO_ACK:          // Nothing to do
+          case CLIENT_ACK:      // User code must ack.
+            break;
+            // TODO aconway 2007-02-22: Provide a way for user
+            // to ack!
+        }
+    }
+
+    //as it stands, transactionality is entirely orthogonal to ack
+    //mode, though the acks will not be processed by the broker under
+    //a transaction until it commits.
+}
+
+
+void BasicMessageChannel::run() {
+    while(channel.isOpen()) {
+        try {
+            Message msg = incoming.waitDispatch();
+            if(msg.getMethod()->isA<BasicReturnBody>()) {
+                ReturnedMessageHandler* handler=0;
+                {
+                    Mutex::ScopedLock l(lock);
+                    handler=returnsHandler;
+                }
+                if(handler == 0) {
+                    // TODO aconway 2007-02-20: proper logging.
+                    cout << "Message returned: " << msg.getData() << endl;
+                }
+                else 
+                    handler->returned(msg);
+            }
+            else {
+                BasicDeliverBody::shared_ptr deliverBody =
+                    boost::shared_polymorphic_downcast<BasicDeliverBody>(
+                        msg.getMethod());
+                std::string tag = deliverBody->getConsumerTag();
+                Consumer consumer;
+                {
+                    Mutex::ScopedLock l(lock);
+                    ConsumerMap::iterator i = consumers.find(tag);
+                    if(i == consumers.end()) 
+                        THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+                                         "Unknown consumer tag=" + tag);
+                    consumer = i->second;
+                }
+                deliver(consumer, msg);
+            }
+        }
+        catch (const ShutdownException&) {
+            /* Orderly shutdown */
+        }
+        catch (const Exception& e) {
+            // FIXME aconway 2007-02-20: Report exception to user.
+            cout << "client::Basic::run() terminated by: " << e.toString()
+                 << "(" << typeid(e).name() << ")" << endl;
+        }
+    }
+}
+
+void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+    Mutex::ScopedLock l(lock);
+    returnsHandler = handler;
+}
+
+void BasicMessageChannel::setQos(){
+    channel.sendAndReceive<BasicQosOkBody>(
+        new BasicQosBody(channel.version, 0, channel.getPrefetch(), false));
+    if(channel.isTransactional())
+        channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version));
+}
+
+}} // namespace qpid::client

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.h?view=auto&rev=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.h Wed Mar 21 12:12:14 2007
@@ -0,0 +1,87 @@
+#ifndef _client_BasicMessageChannel_h
+#define _client_BasicMessageChannel_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "MessageChannel.h"
+#include "IncomingMessage.h"
+
+namespace qpid {
+namespace client {
+/**
+ * Messaging implementation using AMQP 0-8 BasicMessageChannel class
+ * to send and receiving messages.
+ */
+class BasicMessageChannel : public MessageChannel
+{
+  public:
+    BasicMessageChannel(Channel& parent);
+    
+    void consume(
+        Queue& queue, std::string& tag, MessageListener* listener, 
+        AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+        const framing::FieldTable* fields = 0);
+        
+    void cancel(const std::string& tag, bool synch = true);
+
+    bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
+
+    void publish(const Message& msg, const Exchange& exchange,
+                 const std::string& routingKey, 
+                 bool mandatory = false, bool immediate = false);
+
+    void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+    void run();
+
+    void handle(boost::shared_ptr<framing::AMQMethodBody>);
+
+    void handle(shared_ptr<framing::AMQHeaderBody>);
+
+    void handle(shared_ptr<framing::AMQContentBody>);
+    
+    void setQos();
+    
+    void close();
+
+  private:
+
+    struct Consumer{
+        MessageListener* listener;
+        AckMode ackMode;
+        int count;
+        u_int64_t lastDeliveryTag;
+    };
+
+    typedef std::map<std::string, Consumer> ConsumerMap;
+
+    void deliver(Consumer& consumer, Message& msg);
+    
+    sys::Mutex lock;
+    Channel& channel;
+    IncomingMessage incoming;
+    ConsumerMap consumers;
+    ReturnedMessageHandler* returnsHandler;
+};
+
+}} // namespace qpid::client
+
+
+
+#endif  /*!_client_BasicMessageChannel_h*/

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp Wed Mar 21 12:12:14 2007
@@ -25,6 +25,9 @@
 #include <QpidError.h>
 #include <MethodBodyInstances.h>
 #include "Connection.h"
+#include "BasicMessageChannel.h"
+// FIXME aconway 2007-03-21: 
+//#include "MessageMessageChannel.h"
 
 // FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
 // handling of errors that should close the connection or the channel.
@@ -36,8 +39,10 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-Channel::Channel(bool _transactional, uint16_t _prefetch) :
-    basic(*this),
+Channel::Channel(bool _transactional, u_int16_t _prefetch,
+                  MessageChannel* impl) :
+    // FIXME aconway 2007-03-21: MessageMessageChannel
+    messaging(impl ? impl : new BasicMessageChannel(*this)),
     connection(0), 
     prefetch(_prefetch), 
     transactional(_transactional)
@@ -115,7 +120,7 @@
 bool Channel::isOpen() const { return connection; }
 
 void Channel::setQos() {
-    basic.setQos();
+    messaging->setQos();
     // FIXME aconway 2007-02-22: message
 }
 
@@ -192,7 +197,7 @@
     }
     try {
         switch (method->amqpClassId()) {
-          case BasicDeliverBody::CLASS_ID: basic.handle(method); break;
+          case BasicDeliverBody::CLASS_ID: messaging->handle(method); break;
           case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
           case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
           default: throw UnknownMethod();
@@ -226,11 +231,11 @@
 }
 
 void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
-    basic.incoming.add(body);
+    messaging->handle(body);
 }
     
 void Channel::handleContent(AMQContentBody::shared_ptr body){
-    basic.incoming.add(body);
+    messaging->handle(body);
 }
 
 void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
@@ -238,7 +243,7 @@
 }
 
 void Channel::start(){
-    basicDispatcher = Thread(basic);
+    dispatcher = Thread(*messaging);
 }
 
 // Close called by local application.
@@ -274,13 +279,12 @@
 void Channel::closeInternal() {
     if (isOpen());
     {
-        basic.cancelAll();
-        basic.incoming.shutdown();
+        messaging->close();
         connection = 0;
         // A 0 response means we are closed.
         responses.signalResponse(AMQMethodBody::shared_ptr());
     }
-    basicDispatcher.join();        
+    dispatcher.join();        
 }
 
 void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m)
@@ -299,4 +303,31 @@
         send(body);
 }
 
+void Channel::consume(
+    Queue& queue, std::string& tag, MessageListener* listener, 
+    AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
+    messaging->consume(queue, tag, listener, ackMode, noLocal, synch, fields);
+}
+        
+void Channel::cancel(const std::string& tag, bool synch) {
+    messaging->cancel(tag, synch);
+}
+
+bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
+    return messaging->get(msg, queue, ackMode);
+}
+
+void Channel::publish(const Message& msg, const Exchange& exchange,
+                      const std::string& routingKey, 
+                      bool mandatory, bool immediate) {
+    messaging->publish(msg, exchange, routingKey, mandatory, immediate);
+}
+
+void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler) {
+    messaging->setReturnedMessageHandler(handler);
+}
+
+void Channel::run() {
+    messaging->run();
+}
 

Modified: incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h Wed Mar 21 12:12:14 2007
@@ -21,7 +21,7 @@
  * under the License.
  *
  */
-#include "sys/types.h"
+#include <boost/scoped_ptr.hpp>
 #include <framing/amqp_framing.h>
 #include <ClientExchange.h>
 #include <ClientMessage.h>
@@ -29,7 +29,7 @@
 #include <ResponseHandler.h>
 #include "ChannelAdapter.h"
 #include "Thread.h"
-#include "Basic.h"
+#include "AckMode.h"
 
 namespace qpid {
 
@@ -41,7 +41,9 @@
 namespace client {
 
 class Connection;
-
+class MessageChannel;
+class MessageListener;
+class ReturnedMessageHandler;
 
 /**
  * Represents an AMQP channel, i.e. loosely a session of work. It
@@ -53,16 +55,12 @@
 class Channel : public framing::ChannelAdapter
 {
   private:
-    // TODO aconway 2007-02-22: Remove friendship.
-  friend class Basic;
-    // FIXME aconway 2007-02-22: friend class Message;
-    
     struct UnknownMethod {};
         
     sys::Mutex lock;
-    Basic basic;
+    boost::scoped_ptr<MessageChannel> messaging;
     Connection* connection;
-    sys::Thread basicDispatcher;
+    sys::Thread dispatcher;
     ResponseHandler responses;
 
     uint16_t prefetch;
@@ -107,7 +105,10 @@
     void closeInternal();
     void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
     
+    // FIXME aconway 2007-02-23: Get rid of friendships.
   friend class Connection;
+  friend class BasicMessageChannel; // for sendAndReceive.
+  friend class MessageMessageChannel; // for sendAndReceive.
         
   public:
 
@@ -121,8 +122,15 @@
      * @param prefetch specifies the number of unacknowledged
      * messages the channel is willing to have sent to it
      * asynchronously
-     */
-    Channel(bool transactional = false, uint16_t prefetch = 500);
+     *
+     * @param messageImpl Alternate messaging implementation class to
+     * allow alternate protocol implementations of messaging
+     * operations. Takes ownership.
+     */
+    Channel(
+        bool transactional = false, u_int16_t prefetch = 500,
+        MessageChannel* messageImpl = 0);
+     
     ~Channel();
 
     /**
@@ -190,13 +198,6 @@
               bool synch = true);
 
     /**
-     * Get a Basic object which provides functions to send and
-     * receive messages using the AMQP 0-8 Basic class methods.
-     *@see Basic
-     */
-    Basic& getBasic() { return basic; }
-
-    /**
      * For a transactional channel this will commit all
      * publications and acknowledgements since the last commit (or
      * the channel was opened if there has been no previous
@@ -243,6 +244,106 @@
     
     /** True if the channel is open */
     bool isOpen() const;
+
+    /** Get the connection associated with this channel */
+    Connection& getConnection() { return *connection; }
+
+    /** Return the protocol version */
+    framing::ProtocolVersion getVersion() const { return version ; }
+    
+    /**
+     * Creates a 'consumer' for a queue. Messages in (or arriving
+     * at) that queue will be delivered to consumers
+     * asynchronously.
+     * 
+     * @param queue a Queue instance representing the queue to
+     * consume from
+     * 
+     * @param tag an identifier to associate with the consumer
+     * that can be used to cancel its subscription (if empty, this
+     * will be assigned by the broker)
+     * 
+     * @param listener a pointer to an instance of an
+     * implementation of the MessageListener interface. Messages
+     * received from this queue for this consumer will result in
+     * invocation of the received() method on the listener, with
+     * the message itself passed in.
+     * 
+     * @param ackMode the mode of acknowledgement that the broker
+     * should assume for this consumer. @see AckMode
+     * 
+     * @param noLocal if true, this consumer will not be sent any
+     * message published by this connection
+     * 
+     * @param synch if true this call will block until a response
+     * is received from the broker
+     */
+    void consume(
+        Queue& queue, std::string& tag, MessageListener* listener, 
+        AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+        const framing::FieldTable* fields = 0);
+        
+    /**
+     * Cancels a subscription previously set up through a call to consume().
+     *
+     * @param tag the identifier used (or assigned) in the consume
+     * request that set up the subscription to be cancelled.
+     * 
+     * @param synch if true this call will block until a response
+     * is received from the broker
+     */
+    void cancel(const std::string& tag, bool synch = true);
+    /**
+     * Synchronous pull of a message from a queue.
+     * 
+     * @param msg a message object that will contain the message
+     * headers and content if the call completes.
+     * 
+     * @param queue the queue to consume from
+     * 
+     * @param ackMode the acknowledgement mode to use (@see
+     * AckMode)
+     * 
+     * @return true if a message was succcessfully dequeued from
+     * the queue, false if the queue was empty.
+     */
+    bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
+
+    /**
+     * Publishes (i.e. sends a message to the broker).
+     * 
+     * @param msg the message to publish
+     * 
+     * @param exchange the exchange to publish the message to
+     * 
+     * @param routingKey the routing key to publish with
+     * 
+     * @param mandatory if true and the exchange to which this
+     * publish is directed has no matching bindings, the message
+     * will be returned (see setReturnedMessageHandler()).
+     * 
+     * @param immediate if true and there is no consumer to
+     * receive this message on publication, the message will be
+     * returned (see setReturnedMessageHandler()).
+     */
+    void publish(const Message& msg, const Exchange& exchange,
+                 const std::string& routingKey, 
+                 bool mandatory = false, bool immediate = false);
+
+    /**
+     * Set a handler for this channel that will process any
+     * returned messages
+     * 
+     * @see publish()
+     */
+    void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+    /**
+     * Deliver messages from the broker to the appropriate MessageListener. 
+     */
+    void run();
+
+
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp Wed Mar 21 12:12:14 2007
@@ -30,7 +30,6 @@
 
 void Message::setData(const std::string& d) {
     data = d;
-    header->setContentSize(d.size());
 }
 
 Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){

Modified: incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h Wed Mar 21 12:12:14 2007
@@ -43,6 +43,7 @@
  * 
  * Broker initiated messages (basic.return, basic.deliver) are
  * queued for handling by the user dispatch thread.
+ * 
  */
 class IncomingMessage {
   public:

Modified: incubator/qpid/trunk/qpid/cpp/lib/client/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/Makefile.am?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/Makefile.am Wed Mar 21 12:12:14 2007
@@ -14,7 +14,7 @@
   ClientExchange.cpp				\
   ClientMessage.cpp				\
   ClientQueue.cpp				\
-  Basic.cpp					\
+  BasicMessageChannel.cpp			\
   Connection.cpp				\
   Connector.cpp					\
   IncomingMessage.cpp				\
@@ -22,14 +22,16 @@
   ResponseHandler.cpp				\
   ReturnedMessageHandler.cpp
 pkginclude_HEADERS =				\
+  AckMode.h					\
   ClientChannel.h				\
   ClientExchange.h				\
   ClientMessage.h				\
   ClientQueue.h					\
-  Basic.h					\
   Connection.h					\
   Connector.h					\
   IncomingMessage.h				\
+  MessageChannel.h				\
+  BasicMessageChannel.h				\
   MessageListener.h				\
   MethodBodyInstances.h				\
   ResponseHandler.h				\

Added: incubator/qpid/trunk/qpid/cpp/lib/client/MessageChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/MessageChannel.h?view=auto&rev=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/MessageChannel.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/MessageChannel.h Wed Mar 21 12:12:14 2007
@@ -0,0 +1,94 @@
+#ifndef _client_MessageChannel_h
+#define _client_MessageChannel_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "shared_ptr.h"
+#include "sys/Runnable.h"
+#include "AckMode.h"
+
+namespace qpid {
+
+namespace framing {
+class AMQMethodBody;
+class AMQHeaderBody;
+class AMQContentBody;
+class FieldTable;
+}
+
+namespace client {
+
+class Channel;
+class Message;
+class Queue;
+class Exchange;
+class MessageListener;
+class ReturnedMessageHandler;
+
+/**
+ * Abstract interface for messaging implementation for a channel.
+ * 
+ *@see Channel for documentation.
+  */
+class MessageChannel : public sys::Runnable
+{
+  public:
+    /**@see Channel::consume */
+    virtual void consume(
+        Queue& queue, std::string& tag, MessageListener* listener, 
+        AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+        const framing::FieldTable* fields = 0) = 0;
+        
+    /**@see Channel::cancel */
+    virtual void cancel(const std::string& tag, bool synch = true) = 0;
+
+    /**@see Channel::get */
+    virtual bool get(
+        Message& msg, const Queue& queue, AckMode ackMode = NO_ACK) = 0;
+
+    /**@see Channel::get */
+    virtual void publish(const Message& msg, const Exchange& exchange,
+                 const std::string& routingKey, 
+                 bool mandatory = false, bool immediate = false) = 0;
+
+    /**@see Channel::setReturnedMessageHandler */
+    virtual void setReturnedMessageHandler(
+        ReturnedMessageHandler* handler) = 0;
+
+    /** Handle an incoming method. */
+    virtual void handle(shared_ptr<framing::AMQMethodBody>) = 0;
+
+    /** Handle an incoming header */
+    virtual void handle(shared_ptr<framing::AMQHeaderBody>) = 0;
+
+    /** Handle an incoming content */
+    virtual void handle(shared_ptr<framing::AMQContentBody>) = 0;
+    
+    /** Send channel's QOS settings */
+    virtual void setQos() = 0;
+
+    /** Channel is closing */
+    virtual void close() = 0;
+};
+
+}} // namespace qpid::client
+
+
+
+#endif  /*!_client_MessageChannel_h*/

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/MessageChannel.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/MessageChannel.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/lib/common/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/common/Makefile.am?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/common/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/common/Makefile.am Wed Mar 21 12:12:14 2007
@@ -117,6 +117,7 @@
   $(framing)/amqp_framing.h			\
   $(framing)/amqp_types.h			\
   $(framing)/Proxy.h				\
+  shared_ptr.h					\
   Exception.h					\
   ExceptionHolder.h				\
   QpidError.h					\

Modified: incubator/qpid/trunk/qpid/cpp/lib/common/framing/FramingContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/common/framing/FramingContent.h?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/common/framing/FramingContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/common/framing/FramingContent.h Wed Mar 21 12:12:14 2007
@@ -30,6 +30,7 @@
     bool isInline() const { return discriminator == INLINE; }
     bool isReference() const { return discriminator == REFERENCE; }
     const string& getValue() const { return value; }
+    void setValue(const string& newValue) { value = newValue; }
 
     friend std::ostream& operator<<(std::ostream&, const Content&);
 };    

Added: incubator/qpid/trunk/qpid/cpp/lib/common/shared_ptr.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/common/shared_ptr.h?view=auto&rev=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/common/shared_ptr.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/common/shared_ptr.h Wed Mar 21 12:12:14 2007
@@ -0,0 +1,31 @@
+#ifndef _common_shared_ptr_h
+#define _common_shared_ptr_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+/// Import shared_ptr into qpid namespace.
+using boost::shared_ptr;
+} // namespace qpid
+
+
+
+#endif  /*!_common_shared_ptr_h*/

Propchange: incubator/qpid/trunk/qpid/cpp/lib/common/shared_ptr.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/common/shared_ptr.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/tests/ClientChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/ClientChannelTest.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/ClientChannelTest.cpp Wed Mar 21 12:12:14 2007
@@ -32,6 +32,10 @@
 using namespace qpid::sys;
 using namespace qpid::framing;
 
+/// Small frame size so we can create fragmented messages.
+const size_t FRAME_MAX = 256;
+
+
 /**
  * Test client API using an in-process broker.
  */
@@ -42,6 +46,8 @@
     CPPUNIT_TEST(testGetNoContent);
     CPPUNIT_TEST(testConsumeCancel);
     CPPUNIT_TEST(testConsumePublished);
+    CPPUNIT_TEST(testGetFragmentedMessage);
+    CPPUNIT_TEST(testConsumeFragmentedMessage);
     CPPUNIT_TEST_SUITE_END();
 
     struct Listener: public qpid::client::MessageListener {
@@ -65,7 +71,8 @@
   public:
 
     ClientChannelTest()
-        : qname("testq"), data("hello"),
+        : connection(FRAME_MAX),
+          qname("testq"), data("hello"),
           queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE)
     {
         connection.openChannel(channel);
@@ -76,21 +83,21 @@
     void testPublishGet() {
         Message pubMsg(data);
         pubMsg.getHeaders().setString("hello", "world");
-        channel.getBasic().publish(pubMsg, exchange, qname);
+        channel.publish(pubMsg, exchange, qname);
         Message getMsg;
-        CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue));
+        CPPUNIT_ASSERT(channel.get(getMsg, queue));
         CPPUNIT_ASSERT_EQUAL(data, getMsg.getData());
         CPPUNIT_ASSERT_EQUAL(string("world"),
                              getMsg.getHeaders().getString("hello"));
-        CPPUNIT_ASSERT(!channel.getBasic().get(getMsg, queue)); // Empty queue
+        CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue
     }
 
     void testGetNoContent() {
         Message pubMsg;
         pubMsg.getHeaders().setString("hello", "world");
-        channel.getBasic().publish(pubMsg, exchange, qname);
+        channel.publish(pubMsg, exchange, qname);
         Message getMsg;
-        CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue));
+        CPPUNIT_ASSERT(channel.get(getMsg, queue));
         CPPUNIT_ASSERT(getMsg.getData().empty());
         CPPUNIT_ASSERT_EQUAL(string("world"),
                              getMsg.getHeaders().getString("hello"));
@@ -98,10 +105,10 @@
 
     void testConsumeCancel() {
         string tag;             // Broker assigned
-        channel.getBasic().consume(queue, tag, &listener);
+        channel.consume(queue, tag, &listener);
         channel.start();
         CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
-        channel.getBasic().publish(Message("a"), exchange, qname);
+        channel.publish(Message("a"), exchange, qname);
         {
             Mutex::ScopedLock l(listener.monitor);
             Time deadline(now() + 1*TIME_SEC);
@@ -112,8 +119,8 @@
         CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size());
         CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData());
             
-        channel.getBasic().publish(Message("b"), exchange, qname);
-        channel.getBasic().publish(Message("c"), exchange, qname);
+        channel.publish(Message("b"), exchange, qname);
+        channel.publish(Message("c"), exchange, qname);
         {
             Mutex::ScopedLock l(listener.monitor);
             while (listener.messages.size() != 3) {
@@ -124,15 +131,15 @@
         CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData());
         CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData());
     
-        channel.getBasic().cancel(tag);
-        channel.getBasic().publish(Message("d"), exchange, qname);
+        channel.cancel(tag);
+        channel.publish(Message("d"), exchange, qname);
         CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
         {
             Mutex::ScopedLock l(listener.monitor);
             CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2));
         }
         Message msg;
-        CPPUNIT_ASSERT(channel.getBasic().get(msg, queue));
+        CPPUNIT_ASSERT(channel.get(msg, queue));
         CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData());
     }
 
@@ -140,9 +147,9 @@
     void testConsumePublished() {
         Message pubMsg("x");
         pubMsg.getHeaders().setString("y", "z");
-        channel.getBasic().publish(pubMsg, exchange, qname);
+        channel.publish(pubMsg, exchange, qname);
         string tag;
-        channel.getBasic().consume(queue, tag, &listener);
+        channel.consume(queue, tag, &listener);
         CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
         channel.start();
         {
@@ -155,8 +162,40 @@
                              listener.messages[0].getHeaders().getString("y"));
     }
 
+    void testGetFragmentedMessage() {
+        string longStr(FRAME_MAX*2, 'x'); // Longer than max frame size.
+        channel.publish(Message(longStr), exchange, qname);
+        // FIXME aconway 2007-03-21: Remove couts.
+        cout << "==== Fragmented publish:" << endl
+             << connection.conversation << endl;
+        Message getMsg;
+        cout << "==== Fragmented get:" << endl
+             << connection.conversation << endl;
+        CPPUNIT_ASSERT(channel.get(getMsg, queue));
+    }
     
-        
+    void testConsumeFragmentedMessage() {
+        string xx(FRAME_MAX*2, 'x');
+        channel.publish(Message(xx), exchange, qname);
+        cout << "==== Fragmented publish:" << endl
+             << connection.conversation << endl;
+        channel.start();
+        string tag;
+        channel.consume(queue, tag, &listener);
+        string yy(FRAME_MAX*2, 'y');
+        channel.publish(Message(yy), exchange, qname);
+        {
+            Mutex::ScopedLock l(listener.monitor);
+            while (listener.messages.size() != 2)
+                CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC));
+        }
+        // FIXME aconway 2007-03-21: 
+        cout << "==== Fragmented consme 2 messages:" << endl
+             << connection.conversation << endl;
+
+        CPPUNIT_ASSERT_EQUAL(xx, listener.messages[0].getData());
+        CPPUNIT_ASSERT_EQUAL(yy, listener.messages[1].getData());
+    }
 };
 
 // Make this test suite a plugin.

Modified: incubator/qpid/trunk/qpid/cpp/tests/InProcessBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/InProcessBroker.h?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/InProcessBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/InProcessBroker.h Wed Mar 21 12:12:14 2007
@@ -145,25 +145,30 @@
     return out;
 }
 
+} // namespace broker
 
-}} // namespace qpid::broker
-
+namespace client {
 /** An in-process client+broker all in one. */
-class InProcessBrokerClient : public qpid::client::Connection {
+class InProcessBrokerClient : public client::Connection {
   public:
-    qpid::broker::InProcessBroker broker;
-    qpid::broker::InProcessBroker::Conversation& conversation;
+    broker::InProcessBroker broker;
+    broker::InProcessBroker::Conversation& conversation;
     
     /** Constructor creates broker and opens client connection. */
-    InProcessBrokerClient(qpid::framing::ProtocolVersion version=
-                          qpid::framing::highestProtocolVersion
-    ) : broker(version), conversation(broker.conversation)
+    InProcessBrokerClient(
+        u_int32_t max_frame_size=65536,
+        framing::ProtocolVersion version= framing::highestProtocolVersion
+    ) : client::Connection(false, max_frame_size, version),
+        broker(version),
+        conversation(broker.conversation)
     {
         setConnector(broker);
         open("");
     }
-
-    ~InProcessBrokerClient() {}
 };
+
+
+}} // namespace qpid::client
+
 
 #endif // _tests_InProcessBroker_h

Modified: incubator/qpid/trunk/qpid/cpp/tests/ProducerConsumerTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/ProducerConsumerTest.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/ProducerConsumerTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/ProducerConsumerTest.cpp Wed Mar 21 12:12:14 2007
@@ -30,8 +30,9 @@
 #include "AMQP_HighestVersion.h"
 #include "sys/AtomicCount.h"
 
-using namespace qpid::sys;
-using namespace qpid::framing;
+using namespace qpid;
+using namespace sys;
+using namespace framing;
 using namespace boost;
 using namespace std;
 
@@ -99,7 +100,7 @@
     CPPUNIT_TEST_SUITE_END();
 
   public:
-    InProcessBrokerClient client;
+    client::InProcessBrokerClient client;
     ProducerConsumer pc;
 
     WatchedCounter stopped;
@@ -166,7 +167,7 @@
     }
 
 public:
-    ProducerConsumerTest() : client(highestProtocolVersion) {}
+    ProducerConsumerTest() : client() {}
 
     void testProduceConsume() {
         ConsumeRunnable runMe(*this);

Modified: incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp Wed Mar 21 12:12:14 2007
@@ -102,7 +102,7 @@
 	Monitor monitor;
 	SimpleListener listener(&monitor);
 	string tag("MyTag");
-	channel.getBasic().consume(queue, tag, &listener);
+	channel.consume(queue, tag, &listener);
 	if (verbose) std::cout << "Registered consumer." << std::endl;
 
         //we need to enable the message dispatching for this channel
@@ -115,7 +115,7 @@
 	Message msg;
 	string data("MyMessage");
 	msg.setData(data);
-	channel.getBasic().publish(msg, exchange, "MyTopic");
+	channel.publish(msg, exchange, "MyTopic");
 	if (verbose) std::cout << "Published message: " << data << std::endl;
 
 	{

Modified: incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp Wed Mar 21 12:12:14 2007
@@ -116,7 +116,7 @@
             //Consume from the response queue, logging all echoed message to console:
             LoggingListener listener;
             std::string tag;
-            channel.getBasic().consume(response, tag, &listener);
+            channel.consume(response, tag, &listener);
 
             //Process incoming requests on a new thread
             channel.start();
@@ -129,7 +129,7 @@
                 Message msg;
                 msg.getHeaders().setString("RESPONSE_QUEUE", response.getName());
                 msg.setData(text);
-                channel.getBasic().publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service);
+                channel.publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service);
                 
                 std::cout << "Enter text to send:" << std::endl;
             }
@@ -158,10 +158,10 @@
             //Consume from the request queue, echoing back all messages received to the client that sent them
             EchoServer server(&channel);
             std::string tag = "server_tag";
-            channel.getBasic().consume(request, tag, &server);
+            channel.consume(request, tag, &server);
 
             //Process incoming requests on the main thread
-            channel.getBasic().run();
+            channel.run();
             
             connection.close();
         } catch(qpid::QpidError error) {
@@ -184,7 +184,7 @@
         std::cout << "Echoing " << message.getData() << " back to " << name << std::endl;
         
         //'echo' the message back:
-        channel->getBasic().publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name);
+        channel->publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name);
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp Wed Mar 21 12:12:14 2007
@@ -119,9 +119,9 @@
             //set up listener
             Listener listener(&channel, response.getName(), args.getTransactional());
             string tag;
-            channel.getBasic().consume(control, tag, &listener, args.getAckMode());
+            channel.consume(control, tag, &listener, args.getAckMode());
             cout << "topic_listener: Consuming." << endl;
-            channel.getBasic().run();
+            channel.run();
             connection.close();
             cout << "topic_listener: normal exit" << endl;
             return 0;
@@ -166,7 +166,7 @@
               << time/TIME_MSEC << " ms.";
     Message msg(reportstr.str());
     msg.getHeaders().setString("TYPE", "REPORT");
-    channel->getBasic().publish(msg, string(), responseQueue);
+    channel->publish(msg, string(), responseQueue);
     if(transactional){
         channel->commit();
     }

Modified: incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp Wed Mar 21 12:12:14 2007
@@ -129,7 +129,7 @@
             //set up listener
             Publisher publisher(&channel, "topic_control", args.getTransactional());
             std::string tag("mytag");
-            channel.getBasic().consume(response, tag, &publisher, args.getAckMode());
+            channel.consume(response, tag, &publisher, args.getAckMode());
             channel.start();
 
             int batchSize(args.getBatches());
@@ -187,12 +187,13 @@
     {
         Monitor::ScopedLock l(monitor);
         for(int i = 0; i < msgs; i++){
-            channel->getBasic().publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+            channel->publish(
+                msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
         }
         //send report request
         Message reportRequest;
         reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
-        channel->getBasic().publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+        channel->publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
         if(transactional){
             channel->commit();
         }
@@ -216,7 +217,7 @@
     //send termination request
     Message terminationRequest;
     terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
-    channel->getBasic().publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+    channel->publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
     if(transactional){
         channel->commit();
     }