You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/04/04 17:45:37 UTC

svn commit: r525542 - in /incubator/qpid/trunk/qpid/cpp/src: Makefile.am client/ClientChannel.cpp client/ClientChannel.h client/MessageMessageChannel.cpp client/MessageMessageChannel.h tests/ClientChannelTest.cpp

Author: aconway
Date: Wed Apr  4 08:45:37 2007
New Revision: 525542

URL: http://svn.apache.org/viewvc?view=rev&rev=525542
Log:

* Made client::Channel bi-modal: 0-8 or 0-9 modes.
* Added dummy impl of client::MessageMessageChannel.
* Generalised ClientChannelTest to be able to test both modes.

Added:
    incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=525542&r1=525541&r2=525542
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Apr  4 08:45:37 2007
@@ -201,7 +201,8 @@
   $(client)/ClientExchange.cpp				\
   $(client)/ClientQueue.cpp				\
   $(client)/BasicMessageChannel.cpp			\
-  $(client)/Connector.cpp					\
+  $(client)/MessageMessageChannel.cpp			\
+  $(client)/Connector.cpp				\
   $(client)/IncomingMessage.cpp				\
   $(client)/MessageListener.cpp				\
   $(client)/ResponseHandler.cpp				\
@@ -245,10 +246,11 @@
   $(client)/IncomingMessage.h				\
   $(client)/MessageChannel.h				\
   $(client)/BasicMessageChannel.h				\
-  $(client)/MessageListener.h				\
-  $(client)/MethodBodyInstances.h				\
-  $(client)/ResponseHandler.h				\
-  $(client)/ReturnedMessageHandler.h	\
+  $(client)/MessageMessageChannel.h		\
+  $(client)/MessageListener.h			\
+  $(client)/MethodBodyInstances.h		\
+  $(client)/ResponseHandler.h			\
+  $(client)/ReturnedMessageHandler.h		\
   shared_ptr.h					\
   Exception.h					\
   ExceptionHolder.h				\

Modified: incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp?view=diff&rev=525542&r1=525541&r2=525542
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp Wed Apr  4 08:45:37 2007
@@ -26,8 +26,7 @@
 #include "MethodBodyInstances.h"
 #include "Connection.h"
 #include "BasicMessageChannel.h"
-// FIXME aconway 2007-03-21: 
-//#include "MessageMessageChannel.h"
+#include "MessageMessageChannel.h"
 
 // FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
 // handling of errors that should close the connection or the channel.
@@ -39,14 +38,15 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-Channel::Channel(bool _transactional, u_int16_t _prefetch,
-                  MessageChannel* impl) :
-    // FIXME aconway 2007-03-21: MessageMessageChannel
-    messaging(impl ? impl : new BasicMessageChannel(*this)),
-    connection(0), 
-    prefetch(_prefetch), 
-    transactional(_transactional)
-{ }
+Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
+    connection(0), prefetch(_prefetch), transactional(_transactional)
+{
+    switch (mode) {
+      case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break;
+      case AMQP_09: messaging.reset(new MessageMessageChannel(*this)); break;
+      default: assert(0); QPID_ERROR(INTERNAL_ERROR, "Invalid interop-mode.");
+    }
+}
 
 Channel::~Channel(){
     close();

Modified: incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h?view=diff&rev=525542&r1=525541&r2=525542
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h Wed Apr  4 08:45:37 2007
@@ -112,6 +112,7 @@
   friend class MessageMessageChannel; // for sendAndReceive.
         
   public:
+    enum InteropMode { AMQP_08, AMQP_09 };
 
     /**
      * Creates a channel object.
@@ -130,7 +131,7 @@
      */
     Channel(
         bool transactional = false, u_int16_t prefetch = 500,
-        MessageChannel* messageImpl = 0);
+        InteropMode=AMQP_08);
      
     ~Channel();
 

Added: incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp?view=auto&rev=525542
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp Wed Apr  4 08:45:37 2007
@@ -0,0 +1,331 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include <boost/format.hpp>
+#include "MessageMessageChannel.h"
+#include "../framing/AMQMethodBody.h"
+#include "ClientChannel.h"
+#include "ReturnedMessageHandler.h"
+#include "MessageListener.h"
+#include "../framing/FieldTable.h"
+#include "Connection.h"
+#include "../shared_ptr.h"
+
+namespace qpid {
+namespace client {
+
+using namespace std;
+using namespace sys;
+using namespace framing;
+
+MessageMessageChannel::MessageMessageChannel(Channel& ch)
+    : channel(ch), tagCount(0) {}
+
+string MessageMessageChannel::newTag() {
+    Mutex::ScopedLock l(lock);
+    return (boost::format("__tag%d")%++tagCount).str();
+}
+
+void MessageMessageChannel::consume(
+    Queue& queue, std::string& tag, MessageListener* /*listener*/, 
+    AckMode ackMode, bool noLocal, bool /*synch*/, const FieldTable* fields)
+{
+    if (tag.empty())
+        tag = newTag();
+    channel.sendAndReceive<MessageOkBody>(
+        new MessageConsumeBody(
+            channel.getVersion(), 0, queue.getName(), tag, noLocal,
+            ackMode == NO_ACK, false, fields ? *fields : FieldTable()));
+    
+//     // FIXME aconway 2007-02-20: Race condition!
+//     // We could receive the first message for the consumer
+//     // before we create the consumer below.
+//     // Move consumer creation to handler for MessageConsumeOkBody
+//     {
+//         Mutex::ScopedLock l(lock);
+//         ConsumerMap::iterator i = consumers.find(tag);
+//         if (i != consumers.end())
+//             THROW_QPID_ERROR(CLIENT_ERROR,
+//                              "Consumer already exists with tag="+tag);
+//         Consumer& c = consumers[tag];
+//         c.listener = listener;
+//         c.ackMode = ackMode;
+//         c.lastDeliveryTag = 0;
+//     }
+}
+
+
+void MessageMessageChannel::cancel(const std::string& /*tag*/, bool /*synch*/) {
+    // FIXME aconway 2007-02-23: 
+//     Consumer c;
+//     {
+//         Mutex::ScopedLock l(lock);
+//         ConsumerMap::iterator i = consumers.find(tag);
+//         if (i == consumers.end())
+//             return;
+//         c = i->second;
+//         consumers.erase(i);
+//     }
+//     if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) 
+//         channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true));
+//     channel.sendAndReceiveSync<MessageCancelOkBody>(
+//         synch, new MessageCancelBody(channel.version, tag, !synch));
+}
+
+void MessageMessageChannel::close(){
+    // FIXME aconway 2007-02-23: 
+//     ConsumerMap consumersCopy;
+//     {
+//         Mutex::ScopedLock l(lock);
+//         consumersCopy = consumers;
+//         consumers.clear();
+//     }
+//     for (ConsumerMap::iterator i=consumersCopy.begin();
+//          i  != consumersCopy.end(); ++i)
+//     {
+//         Consumer& c = i->second;
+//         if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
+//             && c.lastDeliveryTag > 0)
+//         {
+//             channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true));
+//         }
+//     }
+//     incoming.shutdown();
+}
+
+
+/** Destination ID for the current get.
+ * Must not clash with a generated consumer ID.
+ * TODO aconway 2007-03-06: support multiple outstanding gets?
+ */
+const string getDestinationId("__get__");
+
+bool MessageMessageChannel::get(
+    Message& , const Queue& , AckMode )
+{
+    Mutex::ScopedLock l(lock);
+//     incoming.addDestination(getDestinationId, getDest);
+//     channel.send(
+//         new MessageGetBody(
+//             channel.version, 0, queue.getName(), getDestinationId, ackMode));
+//    return getDest.wait(msg);
+    return false;
+}
+
+
+/** Convert a message to a transfer command. */
+MessageTransferBody::shared_ptr makeTransfer(
+    ProtocolVersion version,
+    const Message& msg, const string& destination,
+    const std::string& routingKey, bool mandatory, bool immediate)
+{
+    return MessageTransferBody::shared_ptr(
+        new MessageTransferBody(
+            version,
+            0,                  // FIXME aconway 2007-04-03: ticket.
+            destination,
+            msg.isRedelivered(),
+            immediate,
+            0,                  // FIXME aconway 2007-02-23: ttl
+            msg.getPriority(),
+            msg.getTimestamp(),
+            static_cast<uint8_t>(msg.getDeliveryMode()),
+            0,                  // FIXME aconway 2007-04-03: Expiration
+            string(),           // Exchange: for broker use only.
+            routingKey,
+            msg.getMessageId(),
+            msg.getCorrelationId(),
+            msg.getReplyTo(),
+            msg.getContentType(),
+            msg.getContentEncoding(),
+            msg.getUserId(),
+            msg.getAppId(),
+            string(),       // FIXME aconway 2007-04-03: TransactionId
+            string(),        //FIXME aconway 2007-04-03: SecurityToken
+            msg.getHeaders(),
+            Content(INLINE, msg.getData()),
+            mandatory
+        ));
+}
+
+void MessageMessageChannel::publish(
+    const Message& msg, const Exchange& exchange,
+    const std::string& routingKey, bool mandatory, bool immediate)
+{
+    MessageTransferBody::shared_ptr transfer = makeTransfer(
+        channel.getVersion(),
+        msg, exchange.getName(), routingKey, mandatory, immediate);
+    // Frame itself uses 8 bytes.
+    u_int32_t frameMax = channel.connection->getMaxFrameSize() - 8;
+    if (transfer->size()  > frameMax) {
+        // FIXME aconway 2007-02-23: 
+        throw QPID_ERROR(INTERNAL_ERROR, "References not yet implemented");
+    }
+    channel.sendAndReceive<MessageOkBody>(transfer.get());
+}
+        
+
+void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
+    assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID);
+    switch(method->amqpMethodId()) {
+      case MessageAppendBody::METHOD_ID: {
+          MessageAppendBody::shared_ptr append =
+              shared_polymorphic_downcast<MessageAppendBody>(method);
+          incoming.appendReference(append->getReference(), append->getBytes());
+          break;
+      }
+      case MessageOpenBody::METHOD_ID: {
+          MessageOpenBody::shared_ptr open =
+              shared_polymorphic_downcast<MessageOpenBody>(method);
+          incoming.openReference(open->getReference());
+          break;
+      }
+
+      case MessageCloseBody::METHOD_ID: {
+          MessageCloseBody::shared_ptr close =
+              shared_polymorphic_downcast<MessageCloseBody>(method);
+          incoming.closeReference(close->getReference());
+          break;
+      }
+
+      case MessageEmptyBody::METHOD_ID: {
+          // FIXME aconway 2007-04-04: 
+          // getDest.empty();
+          break;
+      }
+
+      case MessageCancelBody::METHOD_ID:
+      case MessageCheckpointBody::METHOD_ID:
+
+        // FIXME aconway 2007-04-03:  TODO
+      case MessageOkBody::METHOD_ID:
+      case MessageOffsetBody::METHOD_ID:
+      case MessageQosBody::METHOD_ID:
+      case MessageRecoverBody::METHOD_ID:
+      case MessageRejectBody::METHOD_ID:
+      case MessageResumeBody::METHOD_ID:
+      case MessageTransferBody::METHOD_ID:
+      default:
+        throw Channel::UnknownMethod();
+    }
+}
+
+void MessageMessageChannel::handle(AMQHeaderBody::shared_ptr ){
+    throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported");
+}
+    
+void MessageMessageChannel::handle(AMQContentBody::shared_ptr ){
+    throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported");
+}
+
+// FIXME aconway 2007-02-23: 
+// void MessageMessageChannel::deliver(IncomingMessage::Destination& consumer, Message& msg){
+//     //record delivery tag:
+//     consumer.lastDeliveryTag = msg.getDeliveryTag();
+
+//     //allow registered listener to handle the message
+//     consumer.listener->received(msg);
+
+//     if(channel.isOpen()){
+//         bool multiple(false);
+//         switch(consumer.ackMode){
+//           case LAZY_ACK: 
+//             multiple = true;
+//             if(++(consumer.count) < channel.getPrefetch())
+//                 break;
+//             //else drop-through
+//           case AUTO_ACK:
+//             consumer.lastDeliveryTag = 0;
+//             channel.send(
+//                 new MessageAckBody(
+//                     channel.version, msg.getDeliveryTag(), multiple));
+//           case NO_ACK:          // Nothing to do
+//           case CLIENT_ACK:      // User code must ack.
+//             break;
+//             // TODO aconway 2007-02-22: Provide a way for user
+//             // to ack!
+//         }
+//     }
+
+//     //as it stands, transactionality is entirely orthogonal to ack
+//     //mode, though the acks will not be processed by the broker under
+//     //a transaction until it commits.
+// }
+
+
+void MessageMessageChannel::run() {
+    // FIXME aconway 2007-02-23: 
+//     while(channel.isOpen()) {
+//         try {
+//             Message msg = incoming.waitDispatch();
+//             if(msg.getMethod()->isA<MessageReturnBody>()) {
+//                 ReturnedMessageHandler* handler=0;
+//                 {
+//                     Mutex::ScopedLock l(lock);
+//                     handler=returnsHandler;
+//                 }
+//                 if(handler == 0) {
+//                     // TODO aconway 2007-02-20: proper logging.
+//                     cout << "Message returned: " << msg.getData() << endl;
+//                 }
+//                 else 
+//                     handler->returned(msg);
+//             }
+//             else {
+//                 MessageDeliverBody::shared_ptr deliverBody =
+//                     boost::shared_polymorphic_downcast<MessageDeliverBody>(
+//                         msg.getMethod());
+//                 std::string tag = deliverBody->getConsumerTag();
+//                 Consumer consumer;
+//                 {
+//                     Mutex::ScopedLock l(lock);
+//                     ConsumerMap::iterator i = consumers.find(tag);
+//                     if(i == consumers.end()) 
+//                         THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+//                                          "Unknown consumer tag=" + tag);
+//                     consumer = i->second;
+//                 }
+//                 deliver(consumer, msg);
+//             }
+//         }
+//         catch (const ShutdownException&) {
+//             /* Orderly shutdown */
+//         }
+//         catch (const Exception& e) {
+//             // FIXME aconway 2007-02-20: Report exception to user.
+//             cout << "client::Message::run() terminated by: " << e.toString()
+//                  << "(" << typeid(e).name() << ")" << endl;
+//         }
+//     }
+}
+
+void MessageMessageChannel::setReturnedMessageHandler(
+    ReturnedMessageHandler* )
+{
+    throw QPID_ERROR(INTERNAL_ERROR, "Message class does not support returns");
+}
+
+void MessageMessageChannel::setQos(){
+    channel.sendAndReceive<MessageOkBody>(
+        new MessageQosBody(channel.version, 0, channel.getPrefetch(), false));
+    if(channel.isTransactional())
+        channel.sendAndReceive<TxSelectOkBody>(
+            new TxSelectBody(channel.version));
+}
+
+}} // namespace qpid::client

Added: incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.h?view=auto&rev=525542
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.h Wed Apr  4 08:45:37 2007
@@ -0,0 +1,82 @@
+#ifndef _client_MessageMessageChannel_h
+#define _client_MessageMessageChannel_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "MessageChannel.h"
+#include "IncomingMessage.h"
+#include "../sys/Monitor.h"
+#include <boost/ptr_container/ptr_map.hpp>
+
+namespace qpid {
+namespace client {
+/**
+ * Messaging implementation using AMQP 0-9 MessageMessageChannel class
+ * to send and receiving messages.
+ */
+class MessageMessageChannel : public MessageChannel
+{
+  public:
+    MessageMessageChannel(Channel& parent);
+    
+    void consume(
+        Queue& queue, std::string& tag, MessageListener* listener, 
+        AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+        const framing::FieldTable* fields = 0);
+    
+    void cancel(const std::string& tag, bool synch = true);
+
+    bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
+
+    void publish(const Message& msg, const Exchange& exchange,
+                 const std::string& routingKey, 
+                 bool mandatory = false, bool immediate = false);
+
+    void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+    void run();
+
+    void handle(boost::shared_ptr<framing::AMQMethodBody>);
+
+    void handle(shared_ptr<framing::AMQHeaderBody>);
+
+    void handle(shared_ptr<framing::AMQContentBody>);
+    
+    void setQos();
+    
+    void close();
+
+  private:
+    typedef boost::ptr_map<std::string, IncomingMessage::WaitableDestination>
+    Destinations;
+
+    std::string newTag();
+
+    sys::Mutex lock;
+    Channel& channel;
+    IncomingMessage incoming;
+    long tagCount;
+};
+
+}} // namespace qpid::client
+
+
+
+#endif  /*!_client_MessageMessageChannel_h*/
+

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp?view=diff&rev=525542&r1=525541&r2=525542
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp Wed Apr  4 08:45:37 2007
@@ -26,6 +26,7 @@
 #include "../client/ClientQueue.h"
 #include "../client/ClientExchange.h"
 #include "../client/MessageListener.h"
+#include "../client/BasicMessageChannel.h"
 
 using namespace std;
 using namespace boost;
@@ -38,19 +39,12 @@
 
 
 /**
- * Test client API using an in-process broker.
+ * Test base for client API using an in-process broker.
+ * The test base defines the tests methods, derived classes
+ * instantiate the channel in Basic or Message mode.
  */
-class ClientChannelTest : public CppUnit::TestCase  
+class ClientChannelTestBase : public CppUnit::TestCase  
 {
-    CPPUNIT_TEST_SUITE(ClientChannelTest);
-    CPPUNIT_TEST(testPublishGet);
-    CPPUNIT_TEST(testGetNoContent);
-    CPPUNIT_TEST(testConsumeCancel);
-    CPPUNIT_TEST(testConsumePublished);
-    CPPUNIT_TEST(testGetFragmentedMessage);
-    CPPUNIT_TEST(testConsumeFragmentedMessage);
-    CPPUNIT_TEST_SUITE_END();
-
     struct Listener: public qpid::client::MessageListener {
         vector<Message> messages;
         Monitor monitor;
@@ -62,43 +56,48 @@
     };
     
     InProcessBrokerClient connection; // client::connection + local broker
-    Channel channel;
     const std::string qname;
     const std::string data;
     Queue queue;
     Exchange exchange;
     Listener listener;
 
+  protected:
+        boost::scoped_ptr<Channel> channel;
+
   public:
 
-    ClientChannelTest()
+    ClientChannelTestBase()
         : connection(FRAME_MAX),
           qname("testq"), data("hello"),
           queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE)
-    {
-        connection.openChannel(channel);
-        CPPUNIT_ASSERT(channel.getId() != 0);
-        channel.declareQueue(queue);
+    {}
+
+    void setUp() {
+        CPPUNIT_ASSERT(channel);
+        connection.openChannel(*channel);
+        CPPUNIT_ASSERT(channel->getId() != 0);
+        channel->declareQueue(queue);
     }
 
     void testPublishGet() {
         Message pubMsg(data);
         pubMsg.getHeaders().setString("hello", "world");
-        channel.publish(pubMsg, exchange, qname);
+        channel->publish(pubMsg, exchange, qname);
         Message getMsg;
-        CPPUNIT_ASSERT(channel.get(getMsg, queue));
+        CPPUNIT_ASSERT(channel->get(getMsg, queue));
         CPPUNIT_ASSERT_EQUAL(data, getMsg.getData());
         CPPUNIT_ASSERT_EQUAL(string("world"),
                              getMsg.getHeaders().getString("hello"));
-        CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue
+        CPPUNIT_ASSERT(!channel->get(getMsg, queue)); // Empty queue
     }
 
     void testGetNoContent() {
         Message pubMsg;
         pubMsg.getHeaders().setString("hello", "world");
-        channel.publish(pubMsg, exchange, qname);
+        channel->publish(pubMsg, exchange, qname);
         Message getMsg;
-        CPPUNIT_ASSERT(channel.get(getMsg, queue));
+        CPPUNIT_ASSERT(channel->get(getMsg, queue));
         CPPUNIT_ASSERT(getMsg.getData().empty());
         CPPUNIT_ASSERT_EQUAL(string("world"),
                              getMsg.getHeaders().getString("hello"));
@@ -106,10 +105,10 @@
 
     void testConsumeCancel() {
         string tag;             // Broker assigned
-        channel.consume(queue, tag, &listener);
-        channel.start();
+        channel->consume(queue, tag, &listener);
+        channel->start();
         CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
-        channel.publish(Message("a"), exchange, qname);
+        channel->publish(Message("a"), exchange, qname);
         {
             Mutex::ScopedLock l(listener.monitor);
             Time deadline(now() + 1*TIME_SEC);
@@ -120,8 +119,8 @@
         CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size());
         CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData());
             
-        channel.publish(Message("b"), exchange, qname);
-        channel.publish(Message("c"), exchange, qname);
+        channel->publish(Message("b"), exchange, qname);
+        channel->publish(Message("c"), exchange, qname);
         {
             Mutex::ScopedLock l(listener.monitor);
             while (listener.messages.size() != 3) {
@@ -132,15 +131,15 @@
         CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData());
         CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData());
     
-        channel.cancel(tag);
-        channel.publish(Message("d"), exchange, qname);
+        channel->cancel(tag);
+        channel->publish(Message("d"), exchange, qname);
         CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
         {
             Mutex::ScopedLock l(listener.monitor);
             CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2));
         }
         Message msg;
-        CPPUNIT_ASSERT(channel.get(msg, queue));
+        CPPUNIT_ASSERT(channel->get(msg, queue));
         CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData());
     }
 
@@ -148,11 +147,11 @@
     void testConsumePublished() {
         Message pubMsg("x");
         pubMsg.getHeaders().setString("y", "z");
-        channel.publish(pubMsg, exchange, qname);
+        channel->publish(pubMsg, exchange, qname);
         string tag;
-        channel.consume(queue, tag, &listener);
+        channel->consume(queue, tag, &listener);
         CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
-        channel.start();
+        channel->start();
         {
             Mutex::ScopedLock l(listener.monitor);
             while (listener.messages.size() != 1) 
@@ -165,19 +164,19 @@
 
     void testGetFragmentedMessage() {
         string longStr(FRAME_MAX*2, 'x'); // Longer than max frame size.
-        channel.publish(Message(longStr), exchange, qname);
+        channel->publish(Message(longStr), exchange, qname);
         Message getMsg;
-        CPPUNIT_ASSERT(channel.get(getMsg, queue));
+        CPPUNIT_ASSERT(channel->get(getMsg, queue));
     }
     
     void testConsumeFragmentedMessage() {
         string xx(FRAME_MAX*2, 'x');
-        channel.publish(Message(xx), exchange, qname);
-        channel.start();
+        channel->publish(Message(xx), exchange, qname);
+        channel->start();
         string tag;
-        channel.consume(queue, tag, &listener);
+        channel->consume(queue, tag, &listener);
         string yy(FRAME_MAX*2, 'y');
-        channel.publish(Message(yy), exchange, qname);
+        channel->publish(Message(yy), exchange, qname);
         {
             Mutex::ScopedLock l(listener.monitor);
             while (listener.messages.size() != 2)
@@ -188,6 +187,23 @@
     }
 };
 
+class BasicClientChannelTest : public ClientChannelTestBase {
+    CPPUNIT_TEST_SUITE(BasicClientChannelTest);
+    CPPUNIT_TEST(testPublishGet);
+    CPPUNIT_TEST(testGetNoContent);
+    CPPUNIT_TEST(testConsumeCancel);
+    CPPUNIT_TEST(testConsumePublished);
+    CPPUNIT_TEST(testGetFragmentedMessage);
+    CPPUNIT_TEST(testConsumeFragmentedMessage);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+    BasicClientChannelTest(){
+        channel.reset(new Channel(false, 500, Channel::AMQP_08));
+    }
+};
+
+
 // Make this test suite a plugin.
 CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(ClientChannelTest);
+CPPUNIT_TEST_SUITE_REGISTRATION(BasicClientChannelTest);