You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/01/29 17:13:27 UTC

svn commit: r501087 [1/2] - in /incubator/qpid/branches/qpid.0-9: cpp/ cpp/gen/ cpp/lib/broker/ cpp/lib/client/ cpp/lib/common/framing/ cpp/lib/common/sys/apr/ cpp/tests/ gentools/src/org/apache/qpid/gentools/ gentools/templ.cpp/

Author: aconway
Date: Mon Jan 29 08:13:24 2007
New Revision: 501087

URL: http://svn.apache.org/viewvc?view=rev&rev=501087
Log:
* Added ClientAdapter - client side ChannelAdapter. Updated client side.
* Moved ChannelAdapter initialization from ctor to init(), updated broker side.
* Improved various exception messages with boost::format messages.
* Removed unnecssary virtual inheritance.
* Widespread: fixed incorrect non-const ProtocolVersion& parameters.
* Client API: pass channels by reference, not pointer.
* codegen:
 - MethodBodyClass.h.templ: Added CLASS_ID, METHOD_ID and isA() template.
 - Various: fixed non-const ProtocolVersion& parameters.
* cpp/bootstrap: Allow config arguments with -build.
* cpp/gen/Makefile.am: Merged codegen fixes from trunk.

Added:
    incubator/qpid/branches/qpid.0-9/cpp/gen/make-gen-src-mk.sh
      - copied unchanged from r501057, incubator/qpid/trunk/qpid/cpp/gen/make-gen-src-mk.sh
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.cpp   (with props)
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.h   (with props)
Modified:
    incubator/qpid/branches/qpid.0-9/cpp/bootstrap
    incubator/qpid/branches/qpid.0-9/cpp/gen/Makefile.am
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolInitiation.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/amqp_types.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/apr/APRSocket.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/apr/APRSocket.h
    incubator/qpid/branches/qpid.0-9/cpp/tests/TxBufferTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/client_test.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/echo_service.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/topic_listener.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/topic_publisher.cpp
    incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/CppGenerator.java
    incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ClientOperations.h.tmpl
    incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ClientProxy.h.tmpl
    incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl
    incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ServerProxy.h.tmpl
    incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/MethodBodyClass.h.tmpl

Modified: incubator/qpid/branches/qpid.0-9/cpp/bootstrap
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/bootstrap?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/bootstrap (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/bootstrap Mon Jan 29 08:13:24 2007
@@ -31,7 +31,8 @@
 autoconf
 
 if [ "$1" = "-build" ] ; then
-    ./configure
+    shift
+    ./configure "$@"
     make
     make check
 fi

Modified: incubator/qpid/branches/qpid.0-9/cpp/gen/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/gen/Makefile.am?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/gen/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/gen/Makefile.am Mon Jan 29 08:13:24 2007
@@ -6,7 +6,7 @@
 # Distribute the generated sources, at least for now, since
 # the generator code is in java.
 EXTRA_DIST = $(BUILT_SOURCES)
-MAINTAINERCLEANFILES = $(BUILT_SOURCES)
+DISTCLEANFILES = $(BUILT_SOURCES) timestamp gen-src.mk
 
 # Don't attempt to run the code generator unless configure has set
 # CAN_GENERATE_CODE, indicating that the amqp.xml and tools needed
@@ -31,21 +31,7 @@
 	        -c -o . -t $(gentools_dir)/templ.cpp $(spec)
 	touch timestamp
 
-DISTCLEANFILES = gen-src.mk
 gen-src.mk: timestamp
-	( echo 'generated_sources = '\\				\
-	  && ls *.cpp | sort -u | sed 's/.*/  & \\/;$$s/ \\//';	\
-	  echo 'generated_headers = '\\				\
-	  && ls *.h | sort -u | sed 's/.*/  & \\/;$$s/ \\//';	\
-	) > $@-t
-	( echo if CAN_GENERATE_CODE;					\
-	  echo 'java_sources = '\\					\
-	    && find $(gentools_srcdir) -name '*.java'			\
-		| sort -u | sed 's/.*/  & \\/;$$s/ \\//';		\
-	  echo 'cxx_templates = '\\					\
-	    && find $(gentools_dir)/templ.cpp -name '*.tmpl'		\
-		| sort -u | sed 's/.*/  & \\/;$$s/ \\//';		\
-	  echo endif							\
-	) >> $@-t
+	./make-gen-src-mk.sh $(gentools_dir) $(gentools_srcdir) > $@-t
 	mv $@-t $@
 endif

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Mon Jan 29 08:13:24 2007
@@ -212,22 +212,21 @@
               
 void BrokerAdapter::ServerOps::ChannelHandlerImpl::open(
     const MethodContext& context, const string& /*outOfBand*/){
-    // FIXME aconway 2007-01-17: Assertions on all channel methods,
-    assertChannelNonZero(channel.getId());
-    if (channel.isOpen())
-        throw ConnectionException(504, "Channel already open");
     channel.open();
-    // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
+    // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
     connection.client->getChannel().openOk(context, std::string()/* ID */);
 } 
         
 void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}         
 void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} 
         
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, 
-                                                         u_int16_t /*classId*/, u_int16_t /*methodId*/){
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(
+    const MethodContext& context, u_int16_t /*replyCode*/,
+    const string& /*replyText*/,
+    u_int16_t /*classId*/, u_int16_t /*methodId*/)
+{
     connection.client->getChannel().closeOk(context);
-    // FIXME aconway 2007-01-18: Following line destroys this. Ugly.
+    // FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
     connection.closeChannel(channel.getId()); 
 } 
         
@@ -499,13 +498,12 @@
 BrokerAdapter::BrokerAdapter(
     Channel* ch, Connection& c, Broker& b
 ) :
-    ChannelAdapter(c.getOutput(), ch->getId()),
     channel(ch),
     connection(c),
     broker(b),
     serverOps(new ServerOps(*ch,c,b))
 {
-    assert(ch);
+    init(ch->getId(), c.getOutput(), ch->getVersion());
 }
 
 void BrokerAdapter::handleMethodInContext(
@@ -544,6 +542,9 @@
 }
 
 
+bool BrokerAdapter::isOpen() const {
+    return channel->isOpen();
+}
 
 }} // namespace qpid::broker
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h Mon Jan 29 08:13:24 2007
@@ -52,6 +52,8 @@
     void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
     void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
 
+    bool isOpen() const;
+    
   private:
     void handleMethodInContext(
         boost::shared_ptr<qpid::framing::AMQMethodBody> method,

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Mon Jan 29 08:13:24 2007
@@ -108,6 +108,7 @@
         MessageStore* const _store = 0, u_int64_t stagingThreshold = 0);
     ~Channel();
     bool isOpen() const { return opened; }
+    const framing::ProtocolVersion& getVersion() const { return version; }
     void open() { opened = true; }
     u_int16_t getId() const { return id; }
     void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp Mon Jan 29 08:13:24 2007
@@ -77,6 +77,7 @@
         MethodContext(0, &getAdapter(0)),
         header->getMajor(), header->getMinor(),
         properties, mechanisms, locales);
+    getAdapter(0).init(0, *out, client->getProtocolVersion());
 }
 
 void Connection::idleOut(){}

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h Mon Jan 29 08:13:24 2007
@@ -32,7 +32,7 @@
         public:
             virtual void add(qpid::framing::AMQContentBody::shared_ptr data) = 0;
             virtual u_int32_t size() = 0;
-            virtual void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0;
+            virtual void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0;
             virtual void encode(qpid::framing::Buffer& buffer) = 0;
             virtual void destroy() = 0;
             virtual ~Content(){}

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp Mon Jan 29 08:13:24 2007
@@ -39,7 +39,7 @@
     return sum;
 }
 
-void InMemoryContent::send(qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
+void InMemoryContent::send(const qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
 {
     for (content_iterator i = content.begin(); i != content.end(); i++) {
         if ((*i)->size() > framesize) {

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h Mon Jan 29 08:13:24 2007
@@ -35,7 +35,7 @@
         public:
             void add(qpid::framing::AMQContentBody::shared_ptr data);
             u_int32_t size();
-            void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+            void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
             void encode(qpid::framing::Buffer& buffer);
             void destroy();
             ~InMemoryContent(){}

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp Mon Jan 29 08:13:24 2007
@@ -37,7 +37,7 @@
     return 0;//all content is written as soon as it is added
 }
 
-void LazyLoadedContent::send(qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
+void LazyLoadedContent::send(const qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
 {
     if (expectedSize > framesize) {        
         for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) {            

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h Mon Jan 29 08:13:24 2007
@@ -34,7 +34,7 @@
             LazyLoadedContent(MessageStore* const store, Message* const msg, u_int64_t expectedSize);
             void add(qpid::framing::AMQContentBody::shared_ptr data);
             u_int32_t size();
-            void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+            void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
             void encode(qpid::framing::Buffer& buffer);
             void destroy();
             ~LazyLoadedContent(){}

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.cpp?view=auto&rev=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.cpp Mon Jan 29 08:13:24 2007
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "AMQP_ClientOperations.h"
+#include "ClientAdapter.h"
+#include "Connection.h"
+#include "Exception.h"
+#include "AMQMethodBody.h"
+
+namespace qpid {
+namespace client {
+
+using namespace qpid;
+using namespace qpid::framing;
+
+typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+void ClientAdapter::handleMethodInContext(
+    boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+    const MethodContext& context
+)
+{
+    try{
+        method->invoke(*clientOps, context);
+    }catch(ChannelException& e){
+        connection.client->getChannel().close(
+            context, e.code, e.toString(),
+            method->amqpClassId(), method->amqpMethodId());
+        connection.closeChannel(getId());
+    }catch(ConnectionException& e){
+        connection.client->getConnection().close(
+            context, e.code, e.toString(),
+            method->amqpClassId(), method->amqpMethodId());
+    }catch(std::exception& e){
+        connection.client->getConnection().close(
+            context, 541/*internal error*/, e.what(),
+            method->amqpClassId(), method->amqpMethodId());
+    }
+}
+
+void ClientAdapter::handleHeader(AMQHeaderBody::shared_ptr body) {
+    channel->handleHeader(body);
+}
+
+void ClientAdapter::handleContent(AMQContentBody::shared_ptr body) {
+    channel->handleContent(body);
+}
+
+void ClientAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
+    // TODO aconway 2007-01-17: Implement heartbeats.
+}
+
+
+
+}} // namespace qpid::client
+

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.h?view=auto&rev=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.h Mon Jan 29 08:13:24 2007
@@ -0,0 +1,66 @@
+#ifndef _client_ClientAdapter_h
+#define _client_ClientAdapter_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "ChannelAdapter.h"
+#include "ClientChannel.h"
+
+namespace qpid {
+namespace client {
+
+class AMQMethodBody;
+class Connection;
+
+/**
+ * Per-channel protocol adapter.
+ *
+ * Translates protocol bodies into calls on the core Channel,
+ * Connection and Client objects.
+ *
+ * Owns a channel, has references to Connection and Client.
+ */
+class ClientAdapter : public framing::ChannelAdapter
+{
+  public:
+    ClientAdapter(std::auto_ptr<Channel> ch, Connection&, Client&);
+    Channel& getChannel() { return *channel; }
+
+    void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
+    void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
+    void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
+
+  private:
+    void handleMethodInContext(
+        boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+        const framing::MethodContext& context);
+    
+    class ClientOps;
+
+    std::auto_ptr<Channel> channel;
+    Connection& connection;
+    Client& client;
+    boost::shared_ptr<ClientOps> clientOps;
+};
+
+}} // namespace qpid::client
+
+
+
+#endif  /*!_client_ClientAdapter_h*/

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp Mon Jan 29 08:13:24 2007
@@ -23,42 +23,115 @@
 #include <ClientMessage.h>
 #include <QpidError.h>
 #include <MethodBodyInstances.h>
+#include "Connection.h"
+
+// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
+// handling of errors that should close the connection or the channel.
+// Make sure the user thread receives a connection in each case.
+//
 
 using namespace boost;          //to use dynamic_pointer_cast
 using namespace qpid::client;
 using namespace qpid::framing;
 using namespace qpid::sys;
 
+const std::string Channel::OK("OK");
+
 Channel::Channel(bool _transactional, u_int16_t _prefetch) :
-    id(0),
-    con(0), 
-    out(0), 
+    connection(0), 
     incoming(0),
-    closed(true),
     prefetch(_prefetch), 
-    transactional(_transactional),
-// AMQP version management change - kpvdr 2006-11-20
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
-    version(8, 0)
+    transactional(_transactional)
 { }
 
 Channel::~Channel(){
-    stop();
+    close();
+}
+
+void Channel::open(ChannelId id, Connection& con)
+{
+    if (isOpen())
+        THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id);
+    connection = &con;
+    init(id, con, con.getVersion()); // ChannelAdapter initialization.
+    string oob;
+    if (id != 0) 
+        sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob));
+}
+
+void Channel::protocolInit(
+    const std::string& uid, const std::string& pwd, const std::string& vhost) {
+    assert(connection);
+    responses.expect();
+    connection->connector->init(); // Send ProtocolInit block.
+    responses.receive<ConnectionStartBody>();
+    
+    FieldTable props;
+    string mechanism("PLAIN");
+    string response = ((char)0) + uid + ((char)0) + pwd;
+    string locale("en_US");
+    // TODO aconway 2007-01-26: Move client over to proxy model,
+    // symmetric with server.
+    ConnectionTuneBody::shared_ptr proposal =
+        sendAndReceive<ConnectionTuneBody>(
+            new ConnectionStartOkBody(
+                version, props, mechanism, response, locale));
+
+    /**
+     * Assume for now that further challenges will not be required
+     //receive connection.secure
+     responses.receive(connection_secure));
+     //send connection.secure-ok
+     connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
+    **/
+
+    connection->send(
+        new AMQFrame(
+            version, 0,
+            new ConnectionTuneOkBody(
+                version, proposal->getChannelMax(),
+                connection->getMaxFrameSize(),
+                proposal->getHeartbeat())));
+    
+    u_int16_t heartbeat = proposal->getHeartbeat();
+    connection->connector->setReadTimeout(heartbeat * 2);
+    connection->connector->setWriteTimeout(heartbeat);
+
+    // Send connection open.
+    std::string capabilities;
+    responses.expect();
+    send(new AMQFrame(
+             version, 0,
+             new ConnectionOpenBody(version, vhost, capabilities, true)));
+    //receive connection.open-ok (or redirect, but ignore that for now
+    //esp. as using force=true).
+    responses.waitForResponse();
+    if(responses.validate<ConnectionOpenOkBody>()) {
+        //ok
+    }else if(responses.validate<ConnectionRedirectBody>()){
+        //ignore for now
+        ConnectionRedirectBody::shared_ptr redirect(
+            shared_polymorphic_downcast<ConnectionRedirectBody>(
+                responses.getResponse()));
+        std::cout << "Received redirection to " << redirect->getHost()
+                  << std::endl;
+    } else {
+        THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
+    }
 }
+    
+bool Channel::isOpen() const { return connection; }
 
 void Channel::setPrefetch(u_int16_t _prefetch){
     prefetch = _prefetch;
-    if(con != 0 && out != 0){
-        setQos();
-    }
+    setQos();
 }
 
 void Channel::setQos(){
-// AMQP version management change - kpvdr 2006-11-20
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
-    sendAndReceive(new AMQFrame(version, id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok);
+    sendAndReceive<BasicQosOkBody>(
+        new BasicQosBody(version, 0, prefetch, false));
     if(transactional){
-        sendAndReceive(new AMQFrame(version, id, new TxSelectBody(version)), method_bodies.tx_select_ok);
+        sendAndReceive<TxSelectOkBody>(new TxSelectBody(version));
     }
 }
 
@@ -66,62 +139,51 @@
     string name = exchange.getName();
     string type = exchange.getType();
     FieldTable args;
-    AMQFrame*  frame = new AMQFrame(version, id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args));
-    if(synch){
-        sendAndReceive(frame, method_bodies.exchange_declare_ok);
-    }else{
-        out->send(frame);
-    }
+    sendAndReceiveSync<ExchangeDeclareOkBody>(
+        synch,
+        new ExchangeDeclareBody(
+            version, 0, name, type, false, false, false, false, !synch, args));
 }
 
 void Channel::deleteExchange(Exchange& exchange, bool synch){
     string name = exchange.getName();
-    AMQFrame*  frame = new AMQFrame(version, id, new ExchangeDeleteBody(version, 0, name, false, !synch));
-    if(synch){
-        sendAndReceive(frame, method_bodies.exchange_delete_ok);
-    }else{
-        out->send(frame);
-    }
+    sendAndReceiveSync<ExchangeDeleteOkBody>(
+        synch,
+        new ExchangeDeleteBody(version, 0, name, false, !synch));
 }
 
 void Channel::declareQueue(Queue& queue, bool synch){
     string name = queue.getName();
     FieldTable args;
-    AMQFrame*  frame = new AMQFrame(version, id, new QueueDeclareBody(version, 0, name, false, false, 
-                                                             queue.isExclusive(), 
-                                                             queue.isAutoDelete(), !synch, args));
-    if(synch){
-        sendAndReceive(frame, method_bodies.queue_declare_ok);
+    sendAndReceiveSync<QueueDeclareOkBody>(
+        synch,
+        new QueueDeclareBody(
+            version, 0, name, false, false, 
+            queue.isExclusive(), queue.isAutoDelete(), !synch, args));        
+    if (synch) {
         if(queue.getName().length() == 0){
             QueueDeclareOkBody::shared_ptr response = 
-                dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse());
+                shared_polymorphic_downcast<QueueDeclareOkBody>(
+                    responses.getResponse());
             queue.setName(response->getQueue());
         }
-    }else{
-        out->send(frame);
     }
 }
 
 void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
     //ticket, queue, ifunused, ifempty, nowait
     string name = queue.getName();
-    AMQFrame*  frame = new AMQFrame(version, id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
-    if(synch){
-        sendAndReceive(frame, method_bodies.queue_delete_ok);
-    }else{
-        out->send(frame);
-    }
+    sendAndReceiveSync<QueueDeleteOkBody>(
+        synch,
+        new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
 }
 
 void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
     string e = exchange.getName();
     string q = queue.getName();
-    AMQFrame*  frame = new AMQFrame(version, id, new QueueBindBody(version, 0, q, e, key,!synch, args));
-    if(synch){
-        sendAndReceive(frame, method_bodies.queue_bind_ok);
-    }else{
-        out->send(frame);
-    }
+    sendAndReceiveSync<QueueBindOkBody>(
+        synch,
+        new QueueBindBody(version, 0, q, e, key,!synch, args));
 }
 
 void Channel::consume(
@@ -129,52 +191,48 @@
     int ackMode, bool noLocal, bool synch, const FieldTable* fields)
 {
     string q = queue.getName();
-    AMQFrame* frame =
-        new AMQFrame(version, 
-            id,
-            new BasicConsumeBody(
-                version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
-                fields ? *fields : FieldTable()));
-    if(synch){
-        sendAndReceive(frame, method_bodies.basic_consume_ok);
-        BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse());
+    sendAndReceiveSync<BasicConsumeOkBody>(
+        synch,
+        new BasicConsumeBody(
+            version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
+            fields ? *fields : FieldTable()));
+    if (synch) {
+        BasicConsumeOkBody::shared_ptr response =
+            shared_polymorphic_downcast<BasicConsumeOkBody>(
+                responses.getResponse());
         tag = response->getConsumerTag();
-    }else{
-        out->send(frame);
     }
-    Consumer* c = new Consumer();
-    c->listener = listener;
-    c->ackMode = ackMode;
-    c->lastDeliveryTag = 0;
-    consumers[tag] = c;
+    Consumer& c = consumers[tag];
+    c.listener = listener;
+    c.ackMode = ackMode;
+    c.lastDeliveryTag = 0;
 }
 
-void Channel::cancel(std::string& tag, bool synch){
-    Consumer* c = consumers[tag];
-    if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
-        out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true)));
-    }
-
-    AMQFrame*  frame = new AMQFrame(version, id, new BasicCancelBody(version, (string&) tag, !synch));
-    if(synch){
-        sendAndReceive(frame, method_bodies.basic_cancel_ok);
-    }else{
-        out->send(frame);
-    }
-    consumers.erase(tag);
-    if(c != 0){
-        delete c;
+void Channel::cancel(const std::string& tag, bool synch) {
+    ConsumerMap::iterator i = consumers.find(tag);
+    if (i != consumers.end()) {
+        Consumer& c = i->second;
+        if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) 
+            send(new BasicAckBody(version, c.lastDeliveryTag, true));
+        sendAndReceiveSync<BasicCancelOkBody>(
+            synch, new BasicCancelBody(version, tag, !synch));
+        consumers.erase(tag);
     }
 }
 
 void Channel::cancelAll(){
-    for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){
-        Consumer* c = i->second;
-        if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){
-            out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true)));
+    while(!consumers.empty()) {
+        Consumer c = consumers.begin()->second;
+        consumers.erase(consumers.begin());
+        if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
+            && c.lastDeliveryTag > 0)
+        {
+            // Let exceptions propagate, if one fails no point
+            // trying the rest. NB no memory leaks if we do,
+            // ConsumerMap holds values, not pointers.
+            // 
+            send(new BasicAckBody(version, c.lastDeliveryTag, true));
         }
-        consumers.erase(i);
-        delete c;
     }
 }
 
@@ -191,26 +249,28 @@
     retrieved = 0;
 }
 
-bool Channel::get(Message& msg, const Queue& queue, int ackMode){
+bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
     string name = queue.getName();
-    AMQFrame*  frame = new AMQFrame(version, id, new BasicGetBody(version, 0, name, ackMode));
+    AMQBody::shared_ptr  body(new BasicGetBody(version, 0, name, ackMode));
     responses.expect();
-    out->send(frame);
+    send(body);
     responses.waitForResponse();
     AMQMethodBody::shared_ptr response = responses.getResponse();
-    if(method_bodies.basic_get_ok.match(response.get())){
+    if(response->isA<BasicGetOkBody>()) {
         if(incoming != 0){
             std::cout << "Existing message not complete" << std::endl;
+            // FIXME aconway 2007-01-26: close the connection? the channel?
             THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
         }else{
             incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
         }
         retrieve(msg);
         return true;
-    }if(method_bodies.basic_get_empty.match(response.get())){
+    }if(response->isA<BasicGetEmptyBody>()){
         return false;
     }else{
-        THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get.");
+        // FIXME aconway 2007-01-26: must close the connection.
+        THROW_QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame");
     }
 }
 
@@ -219,25 +279,24 @@
     string e = exchange.getName();
     string key = routingKey;
 
-    out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
+    send(new BasicPublishBody(version, 0, e, key, mandatory, immediate));
     //break msg up into header frame and content frame(s) and send these
     string data = msg.getData();
     msg.header->setContentSize(data.length());
-    AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header));
-    out->send(new AMQFrame(version, id, body));
+    send(msg.header);
     
     u_int64_t data_length = data.length();
     if(data_length > 0){
-        u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes
+        u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes
         if(data_length < frag_size){
-            out->send(new AMQFrame(version, id, new AMQContentBody(data)));
+            send(new AMQContentBody(data));
         }else{
             u_int32_t offset = 0;
             u_int32_t remaining = data_length - offset;
             while (remaining > 0) {
                 u_int32_t length = remaining > frag_size ? frag_size : remaining;
                 string frag(data.substr(offset, length));
-                out->send(new AMQFrame(version, id, new AMQContentBody(frag)));                          
+                send(new AMQContentBody(frag));                          
                 
                 offset += length;
                 remaining = data_length - offset;
@@ -247,56 +306,48 @@
 }
     
 void Channel::commit(){
-    AMQFrame*  frame = new AMQFrame(version, id, new TxCommitBody(version));
-    sendAndReceive(frame, method_bodies.tx_commit_ok);
+    sendAndReceive<TxCommitOkBody>(new TxCommitBody(version));
 }
 
 void Channel::rollback(){
-    AMQFrame*  frame = new AMQFrame(version, id, new TxRollbackBody(version));
-    sendAndReceive(frame, method_bodies.tx_rollback_ok);
-}
-    
-void Channel::handleRequest(AMQRequestBody::shared_ptr body) {
-    // FIXME aconway 2007-01-19: request/response handling.
-    handleMethod(body);
+    sendAndReceive<TxRollbackOkBody>(new TxRollbackBody(version));
 }
 
-void Channel::handleResponse(AMQResponseBody::shared_ptr body) {
-    // FIXME aconway 2007-01-19: request/response handling.
-    handleMethod(body);
-}
-
-void Channel::handleMethod(AMQMethodBody::shared_ptr body){
-    //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
+void Channel::handleMethodInContext(
+    AMQMethodBody::shared_ptr body, const MethodContext&)
+{
+    //channel.flow, channel.close, basic.deliver, basic.return or a
+    //response to a synchronous request
     if(responses.isWaiting()){
         responses.signalResponse(body);
-    }else if(method_bodies.basic_deliver.match(body.get())){
+    }else if(body->isA<BasicDeliverBody>()) {
         if(incoming != 0){
+            THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
             std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
             THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
         }else{
             incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
         }
-    }else if(method_bodies.basic_return.match(body.get())){
+    }else if(body->isA<BasicReturnBody>()){
         if(incoming != 0){
             std::cout << "Existing message not complete" << std::endl;
             THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
         }else{
             incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
         }
-    }else if(method_bodies.channel_close.match(body.get())){
-        con->removeChannel(this);
-        //need to signal application that channel has been closed through exception
-
-    }else if(method_bodies.channel_flow.match(body.get())){
-        
-    }else{
-        //signal error
-        std::cout << "Unhandled method: " << *body << std::endl;
-        THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method");
+    }else if(body->isA<ChannelCloseBody>()){
+        peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body));
+    }else if(body->isA<ChannelFlowBody>()){
+        // TODO aconway 2007-01-24: 
+    }else if(body->isA<ConnectionCloseBody>()){
+        connection->close();
+    }else{
+        connection->close(
+            504, "Unrecognised method",
+            body->amqpClassId(), body->amqpMethodId());
     }
 }
-    
+
 void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
     if(incoming == 0){
         //handle invalid frame sequence
@@ -331,27 +382,16 @@
     dispatcher = Thread(this);
 }
 
-void Channel::stop(){
-    {
-        Monitor::ScopedLock l(dispatchMonitor);
-        closed = true;
-        responses.signalResponse(AMQMethodBody::shared_ptr());
-        dispatchMonitor.notify();
-    }
-    dispatcher.join();        
-}
-
 void Channel::run(){
     dispatch();
 }
 
 void Channel::enqueue(){
+    Monitor::ScopedLock l(retrievalMonitor);
     if(incoming->isResponse()){
-        Monitor::ScopedLock l(retrievalMonitor);
         retrieved = incoming;
         retrievalMonitor.notify();
     }else{
-        Monitor::ScopedLock l(dispatchMonitor);
         messages.push(incoming);
         dispatchMonitor.notify();
     }
@@ -360,7 +400,7 @@
 
 IncomingMessage* Channel::dequeue(){
     Monitor::ScopedLock l(dispatchMonitor);
-    while(messages.empty() && !closed){
+    while(messages.empty() && isOpen()){
         dispatchMonitor.wait();
     }    
     IncomingMessage* msg = 0;
@@ -371,25 +411,25 @@
     return msg; 
 }
 
-void Channel::deliver(Consumer* consumer, Message& msg){
+void Channel::deliver(Consumer& consumer, Message& msg){
     //record delivery tag:
-    consumer->lastDeliveryTag = msg.getDeliveryTag();
+    consumer.lastDeliveryTag = msg.getDeliveryTag();
 
     //allow registered listener to handle the message
-    consumer->listener->received(msg);
+    consumer.listener->received(msg);
 
     //if the handler calls close on the channel or connection while
     //handling this message, then consumer will now have been deleted.
-    if(!closed){
+    if(isOpen()){
         bool multiple(false);
-        switch(consumer->ackMode){
-        case LAZY_ACK: 
+        switch(consumer.ackMode){
+          case LAZY_ACK: 
             multiple = true;
-            if(++(consumer->count) < prefetch) break;
+            if(++(consumer.count) < prefetch) break;
             //else drop-through
-        case AUTO_ACK:
-          out->send(new AMQFrame(version, id, new BasicAckBody(version, msg.getDeliveryTag(), multiple)));
-          consumer->lastDeliveryTag = 0;
+          case AUTO_ACK:
+            send(new BasicAckBody(version, msg.getDeliveryTag(), multiple));
+            consumer.lastDeliveryTag = 0;
         }
     }
 
@@ -399,7 +439,7 @@
 }
 
 void Channel::dispatch(){
-    while(!closed){
+    while(isOpen()){
         IncomingMessage* incomingMsg = dequeue();
         if(incomingMsg){
             //Note: msg is currently only valid for duration of this call
@@ -416,12 +456,10 @@
                 msg.deliveryTag = incomingMsg->getDeliveryTag();
                 std::string tag = incomingMsg->getConsumerTag();
                 
-                if(consumers[tag] == 0){
-                    //signal error
+                if(consumers.find(tag) == consumers.end())
                     std::cout << "Unknown consumer: " << tag << std::endl;
-                }else{
+                else
                     deliver(consumers[tag], msg);
-                }
             }
             delete incomingMsg;
         }
@@ -432,14 +470,60 @@
     returnsHandler = handler;
 }
 
-void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
-    responses.expect();
-    out->send(frame);
-    responses.receive(body);
+// Close called by local application.
+void Channel::close(
+    u_int16_t code, const std::string& text,
+    ClassId classId, MethodId methodId)
+{
+    // FIXME aconway 2007-01-26: Locking?
+    if (getId() != 0 && isOpen()) {
+        try {
+            sendAndReceive<ChannelCloseOkBody>(
+                new ChannelCloseBody(version, code, text, classId, methodId));
+            cancelAll();
+            closeInternal();
+        } catch (...) {
+            closeInternal();
+            throw;
+        }
+    }
 }
 
-void Channel::close(){
-    if(con != 0){
-        con->closeChannel(this);
+// Channel closed by peer.
+void Channel::peerClose(ChannelCloseBody::shared_ptr) {
+    assert(isOpen());
+    closeInternal();
+    // FIXME aconway 2007-01-26: How to throw the proper exception
+    // to the application thread?
+}
+
+void Channel::closeInternal() {
+    assert(isOpen());
+    {
+        Monitor::ScopedLock l(dispatchMonitor);
+        static_cast<ConnectionForChannel*>(connection)->erase(getId()); 
+        connection = 0;
+        // A 0 response means we are closed.
+        responses.signalResponse(AMQMethodBody::shared_ptr());
+        dispatchMonitor.notify();
     }
+    dispatcher.join();        
 }
+
+void Channel::sendAndReceive(AMQBody* toSend, ClassId c, MethodId m)
+{
+    responses.expect();
+    send(toSend);
+    responses.receive(c, m);
+}
+
+void Channel::sendAndReceiveSync(
+    bool sync, AMQBody* body, ClassId c, MethodId m)
+{
+    if(sync)
+        sendAndReceive(body, c, m);
+    else
+        send(body);
+}
+
+

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h Mon Jan 29 08:13:24 2007
@@ -27,7 +27,6 @@
 #include "sys/types.h"
 
 #include <framing/amqp_framing.h>
-#include <Connection.h>
 #include <ClientExchange.h>
 #include <IncomingMessage.h>
 #include <ClientMessage.h>
@@ -35,86 +34,126 @@
 #include <ClientQueue.h>
 #include <ResponseHandler.h>
 #include <ReturnedMessageHandler.h>
+#include "Runnable.h"
+#include "ChannelAdapter.h"
+#include "Thread.h"
 
 namespace qpid {
+namespace framing {
+class ChannelCloseBody;
+}
+
 namespace client {
-    /**
-     * The available acknowledgements modes
-     * 
-     * \ingroup clientapi
-     */
-    enum ack_modes {
-        /** No acknowledgement will be sent, broker can
-            discard messages as soon as they are delivered
-            to a consumer using this mode. **/
-        NO_ACK     = 0,  
-        /** Each message will be automatically
-            acknowledged as soon as it is delivered to the
-            application **/  
-        AUTO_ACK   = 1,  
-        /** Acknowledgements will be sent automatically,
-            but not for each message. **/
-        LAZY_ACK   = 2,
-        /** The application is responsible for explicitly
-            acknowledging messages. **/  
-        CLIENT_ACK = 3 
+
+class Connection;
+
+/**
+ * The available acknowledgements modes
+ * 
+ * \ingroup clientapi
+ */
+enum ack_modes {
+    /** No acknowledgement will be sent, broker can
+        discard messages as soon as they are delivered
+        to a consumer using this mode. **/
+    NO_ACK     = 0,  
+    /** Each message will be automatically
+        acknowledged as soon as it is delivered to the
+        application **/  
+    AUTO_ACK   = 1,  
+    /** Acknowledgements will be sent automatically,
+        but not for each message. **/
+    LAZY_ACK   = 2,
+    /** The application is responsible for explicitly
+        acknowledging messages. **/  
+    CLIENT_ACK = 3 
+};
+
+/**
+ * Represents an AMQP channel, i.e. loosely a session of work. It
+ * is through a channel that most of the AMQP 'methods' are
+ * exposed.
+ * 
+ * \ingroup clientapi
+ */
+class Channel : public framing::ChannelAdapter,
+                public sys::Runnable
+{
+    struct Consumer{
+        MessageListener* listener;
+        int ackMode;
+        int count;
+        u_int64_t lastDeliveryTag;
     };
+    typedef std::map<std::string, Consumer> ConsumerMap;
+    static const std::string OK;
+        
+    Connection* connection;
+    sys::Thread dispatcher;
+    IncomingMessage* incoming;
+    ResponseHandler responses;
+    std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
+    IncomingMessage* retrieved;//holds response to basic.get
+    sys::Monitor dispatchMonitor;
+    sys::Monitor retrievalMonitor;
+    ConsumerMap consumers;
+    ReturnedMessageHandler* returnsHandler;
+
+    u_int16_t prefetch;
+    const bool transactional;
+    framing::ProtocolVersion version;
+
+    void enqueue();
+    void retrieve(Message& msg);
+    IncomingMessage* dequeue();
+    void dispatch();
+    void deliver(Consumer& consumer, Message& msg);
+        
+    void handleHeader(framing::AMQHeaderBody::shared_ptr body);
+    void handleContent(framing::AMQContentBody::shared_ptr body);
+    void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
+        
+    void handleMethodInContext(
+        framing::AMQMethodBody::shared_ptr,
+        const framing::MethodContext& method);
+    void setQos();
+    void cancelAll();
+
+    void protocolInit(
+        const std::string& uid, const std::string& pwd,
+        const std::string& vhost);
+    
+    void sendAndReceive(
+        framing::AMQBody*, framing::ClassId, framing::MethodId);
+
+    void sendAndReceiveSync(
+        bool sync,
+        framing::AMQBody*, framing::ClassId, framing::MethodId);
+
+    template <class BodyType>
+    boost::shared_ptr<BodyType> sendAndReceive(framing::AMQBody* body) {
+        sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID);
+        return boost::shared_polymorphic_downcast<BodyType>(
+            responses.getResponse());
+    }
+
+    template <class BodyType> void sendAndReceiveSync(
+        bool sync, framing::AMQBody* body) {
+        sendAndReceiveSync(
+            sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID);
+    }
+
+    void open(framing::ChannelId, Connection&);
+    void closeInternal();
+    void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
+    
+  friend class Connection;
+        
+  public:
 
+    bool isOpen() const;
+    
     /**
-     * Represents an AMQP channel, i.e. loosely a session of work. It
-     * is through a channel that most of the AMQP 'methods' are
-     * exposed.
-     * 
-     * \ingroup clientapi
-     */
-    class Channel : private virtual framing::BodyHandler,
-                    public virtual sys::Runnable
-    {
-        struct Consumer{
-            MessageListener* listener;
-            int ackMode;
-            int count;
-            u_int64_t lastDeliveryTag;
-        };
-        typedef std::map<std::string,Consumer*>::iterator consumer_iterator; 
-
-	u_int16_t id;
-	Connection* con;
-	sys::Thread dispatcher;
-	framing::OutputHandler* out;
-	IncomingMessage* incoming;
-	ResponseHandler responses;
-	std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
-	IncomingMessage* retrieved;//holds response to basic.get
-	sys::Monitor dispatchMonitor;
-	sys::Monitor retrievalMonitor;
-	std::map<std::string, Consumer*> consumers;
-	ReturnedMessageHandler* returnsHandler;
-	bool closed;
-
-        u_int16_t prefetch;
-        const bool transactional;
-        framing::ProtocolVersion version;
-
-	void enqueue();
-	void retrieve(Message& msg);
-	IncomingMessage* dequeue();
-	void dispatch();
-	void stop();
-	void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body);            
-        void deliver(Consumer* consumer, Message& msg);
-        void setQos();
-	void cancelAll();
-
-	virtual void handleMethod(framing::AMQMethodBody::shared_ptr body);
-	virtual void handleHeader(framing::AMQHeaderBody::shared_ptr body);
-	virtual void handleContent(framing::AMQContentBody::shared_ptr body);
-	virtual void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
-        void handleRequest(framing::AMQRequestBody::shared_ptr);
-        void handleResponse(framing::AMQResponseBody::shared_ptr);
-
-    public:
-        /**
          * Creates a channel object.
          * 
          * @param transactional if true, the publishing and acknowledgement
@@ -124,199 +163,202 @@
          * @param prefetch specifies the number of unacknowledged
          * messages the channel is willing to have sent to it
          * asynchronously
-         */
+     */
 	Channel(bool transactional = false, u_int16_t prefetch = 500);
 	~Channel();
 
-        /**
-         * Declares an exchange.
-         * 
-         * In AMQP Exchanges are the destinations to which messages
-         * are published. They have Queues bound to them and route
-         * messages they receive to those queues. The routing rules
-         * depend on the type of the exchange.
-         * 
-         * @param exchange an Exchange object representing the
-         * exchange to declare
-         * 
-         * @param synch if true this call will block until a response
-         * is received from the broker
-         */
-	void declareExchange(Exchange& exchange, bool synch = true);
-        /**
-         * Deletes an exchange
-         * 
-         * @param exchange an Exchange object representing the exchange to delete
-         * 
-         * @param synch if true this call will block until a response
-         * is received from the broker
-         */
-	void deleteExchange(Exchange& exchange, bool synch = true);
-        /**
-         * Declares a Queue
-         * 
-         * @param queue a Queue object representing the queue to declare
-         * 
-         * @param synch if true this call will block until a response
-         * is received from the broker
-         */
-	void declareQueue(Queue& queue, bool synch = true);
-        /**
-         * Deletes a Queue
-         * 
-         * @param queue a Queue object representing the queue to delete
-         * 
-         * @param synch if true this call will block until a response
-         * is received from the broker
-         */
-	void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
-        /**
-         * Binds a queue to an exchange. The exact semantics of this
-         * (in particular how 'routing keys' and 'binding arguments'
-         * are used) depends on the type of the exchange.
-         * 
-         * @param exchange an Exchange object representing the
-         * exchange to bind to
-         * 
-         * @param queue a Queue object representing the queue to be
-         * bound
-         * 
-         * @param key the 'routing key' for the binding
-         * 
-         * @param args the 'binding arguments' for the binding
-         * 
-         * @param synch if true this call will block until a response
-         * is received from the broker
-         */
-	void bind(const Exchange& exchange, const Queue& queue, const std::string& key, 
-                  const framing::FieldTable& args, bool synch = true);
-        /**
-         * Creates a 'consumer' for a queue. Messages in (or arriving
-         * at) that queue will be delivered to consumers
-         * asynchronously.
-         * 
-         * @param queue a Queue instance representing the queue to
-         * consume from
-         * 
-         * @param tag an identifier to associate with the consumer
-         * that can be used to cancel its subscription (if empty, this
-         * will be assigned by the broker)
-         * 
-         * @param listener a pointer to an instance of an
-         * implementation of the MessageListener interface. Messages
-         * received from this queue for this consumer will result in
-         * invocation of the received() method on the listener, with
-         * the message itself passed in.
-         * 
-         * @param ackMode the mode of acknowledgement that the broker
-         * should assume for this consumer. @see ack_modes
-         * 
-         * @param noLocal if true, this consumer will not be sent any
-         * message published by this connection
-         * 
-         * @param synch if true this call will block until a response
-         * is received from the broker
-         */
-        void consume(
-            Queue& queue, std::string& tag, MessageListener* listener, 
-            int ackMode = NO_ACK, bool noLocal = false, bool synch = true,
-            const framing::FieldTable* fields = 0);
+    /**
+     * Declares an exchange.
+     * 
+     * In AMQP Exchanges are the destinations to which messages
+     * are published. They have Queues bound to them and route
+     * messages they receive to those queues. The routing rules
+     * depend on the type of the exchange.
+     * 
+     * @param exchange an Exchange object representing the
+     * exchange to declare
+     * 
+     * @param synch if true this call will block until a response
+     * is received from the broker
+     */
+    void declareExchange(Exchange& exchange, bool synch = true);
+    /**
+     * Deletes an exchange
+     * 
+     * @param exchange an Exchange object representing the exchange to delete
+     * 
+     * @param synch if true this call will block until a response
+     * is received from the broker
+     */
+    void deleteExchange(Exchange& exchange, bool synch = true);
+    /**
+     * Declares a Queue
+     * 
+     * @param queue a Queue object representing the queue to declare
+     * 
+     * @param synch if true this call will block until a response
+     * is received from the broker
+     */
+    void declareQueue(Queue& queue, bool synch = true);
+    /**
+     * Deletes a Queue
+     * 
+     * @param queue a Queue object representing the queue to delete
+     * 
+     * @param synch if true this call will block until a response
+     * is received from the broker
+     */
+    void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
+    /**
+     * Binds a queue to an exchange. The exact semantics of this
+     * (in particular how 'routing keys' and 'binding arguments'
+     * are used) depends on the type of the exchange.
+     * 
+     * @param exchange an Exchange object representing the
+     * exchange to bind to
+     * 
+     * @param queue a Queue object representing the queue to be
+     * bound
+     * 
+     * @param key the 'routing key' for the binding
+     * 
+     * @param args the 'binding arguments' for the binding
+     * 
+     * @param synch if true this call will block until a response
+     * is received from the broker
+     */
+    void bind(const Exchange& exchange, const Queue& queue, const std::string& key, 
+              const framing::FieldTable& args, bool synch = true);
+    /**
+     * Creates a 'consumer' for a queue. Messages in (or arriving
+     * at) that queue will be delivered to consumers
+     * asynchronously.
+     * 
+     * @param queue a Queue instance representing the queue to
+     * consume from
+     * 
+     * @param tag an identifier to associate with the consumer
+     * that can be used to cancel its subscription (if empty, this
+     * will be assigned by the broker)
+     * 
+     * @param listener a pointer to an instance of an
+     * implementation of the MessageListener interface. Messages
+     * received from this queue for this consumer will result in
+     * invocation of the received() method on the listener, with
+     * the message itself passed in.
+     * 
+     * @param ackMode the mode of acknowledgement that the broker
+     * should assume for this consumer. @see ack_modes
+     * 
+     * @param noLocal if true, this consumer will not be sent any
+     * message published by this connection
+     * 
+     * @param synch if true this call will block until a response
+     * is received from the broker
+     */
+    void consume(
+        Queue& queue, std::string& tag, MessageListener* listener, 
+        int ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+        const framing::FieldTable* fields = 0);
         
-        /**
-         * Cancels a subscription previously set up through a call to consume().
-         *
-         * @param tag the identifier used (or assigned) in the consume
-         * request that set up the subscription to be cancelled.
-         * 
-         * @param synch if true this call will block until a response
-         * is received from the broker
-         */
-	void cancel(std::string& tag, bool synch = true);
-        /**
-         * Synchronous pull of a message from a queue.
-         * 
-         * @param msg a message object that will contain the message
-         * headers and content if the call completes.
-         * 
-         * @param queue the queue to consume from
-         * 
-         * @param ackMode the acknowledgement mode to use (@see
-         * ack_modes)
-         * 
-         * @return true if a message was succcessfully dequeued from
-         * the queue, false if the queue was empty.
-         */
-        bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
-        /**
-         * Publishes (i.e. sends a message to the broker).
-         * 
-         * @param msg the message to publish
-         * 
-         * @param exchange the exchange to publish the message to
-         * 
-         * @param routingKey the routing key to publish with
-         * 
-         * @param mandatory if true and the exchange to which this
-         * publish is directed has no matching bindings, the message
-         * will be returned (see setReturnedMessageHandler()).
-         * 
-         * @param immediate if true and there is no consumer to
-         * receive this message on publication, the message will be
-         * returned (see setReturnedMessageHandler()).
-         */
-        void publish(Message& msg, const Exchange& exchange, const std::string& routingKey, 
-                     bool mandatory = false, bool immediate = false);
-
-        /**
-         * For a transactional channel this will commit all
-         * publications and acknowledgements since the last commit (or
-         * the channel was opened if there has been no previous
-         * commit). This will cause published messages to become
-         * available to consumers and acknowledged messages to be
-         * consumed and removed from the queues they were dispatched
-         * from.
-         * 
-         * Transactionailty of a channel is specified when the channel
-         * object is created (@see Channel()).
-         */
-        void commit();
-        /**
-         * For a transactional channel, this will rollback any
-         * publications or acknowledgements. It will be as if the
-         * ppblished messages were never sent and the acknowledged
-         * messages were never consumed.
-         */
-        void rollback();
-
-        /**
-         * Change the prefetch in use.
-         */
-        void setPrefetch(u_int16_t prefetch);
-
-	/**
-	 * Start message dispatching on a new thread
-	 */
-	void start();
-	/**
-	 * Do message dispatching on this thread
-	 */
-	void run();
-
-        /**
-         * Closes a channel, stopping any message dispatching.
-         */
-        void close();
-
-        /**
-         * Set a handler for this channel that will process any
-         * returned messages
-         * 
-         * @see publish()
-         */
-	void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+    /**
+     * Cancels a subscription previously set up through a call to consume().
+     *
+     * @param tag the identifier used (or assigned) in the consume
+     * request that set up the subscription to be cancelled.
+     * 
+     * @param synch if true this call will block until a response
+     * is received from the broker
+     */
+    void cancel(const std::string& tag, bool synch = true);
+    /**
+     * Synchronous pull of a message from a queue.
+     * 
+     * @param msg a message object that will contain the message
+     * headers and content if the call completes.
+     * 
+     * @param queue the queue to consume from
+     * 
+     * @param ackMode the acknowledgement mode to use (@see
+     * ack_modes)
+     * 
+     * @return true if a message was succcessfully dequeued from
+     * the queue, false if the queue was empty.
+     */
+    bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
+    /**
+     * Publishes (i.e. sends a message to the broker).
+     * 
+     * @param msg the message to publish
+     * 
+     * @param exchange the exchange to publish the message to
+     * 
+     * @param routingKey the routing key to publish with
+     * 
+     * @param mandatory if true and the exchange to which this
+     * publish is directed has no matching bindings, the message
+     * will be returned (see setReturnedMessageHandler()).
+     * 
+     * @param immediate if true and there is no consumer to
+     * receive this message on publication, the message will be
+     * returned (see setReturnedMessageHandler()).
+     */
+    void publish(Message& msg, const Exchange& exchange, const std::string& routingKey, 
+                 bool mandatory = false, bool immediate = false);
 
-        friend class Connection;
-    };
+    /**
+     * For a transactional channel this will commit all
+     * publications and acknowledgements since the last commit (or
+     * the channel was opened if there has been no previous
+     * commit). This will cause published messages to become
+     * available to consumers and acknowledged messages to be
+     * consumed and removed from the queues they were dispatched
+     * from.
+     * 
+     * Transactionailty of a channel is specified when the channel
+     * object is created (@see Channel()).
+     */
+    void commit();
+    /**
+     * For a transactional channel, this will rollback any
+     * publications or acknowledgements. It will be as if the
+     * ppblished messages were never sent and the acknowledged
+     * messages were never consumed.
+     */
+    void rollback();
+
+    /**
+     * Change the prefetch in use.
+     */
+    void setPrefetch(u_int16_t prefetch);
+
+    /**
+     * Start message dispatching on a new thread
+     */
+    void start();
+
+    // TODO aconway 2007-01-26: Can it be private?
+    /**
+     * Dispatch messages on this channel in the calling thread.
+     */
+    void run();
+
+    /**
+     * Close the channel with optional error information.
+     * Closing a channel that is not open has no effect.
+     */
+    void close(
+        framing::ReplyCode = 200, const std::string& =OK,
+        framing::ClassId = 0, framing::MethodId  = 0);
+
+    /**
+     * Set a handler for this channel that will process any
+     * returned messages
+     * 
+     * @see publish()
+     */
+    void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+};
 
 }
 }

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp Mon Jan 29 08:13:24 2007
@@ -18,35 +18,46 @@
  * under the License.
  *
  */
+#include <boost/format.hpp>
+
 #include <Connection.h>
 #include <ClientChannel.h>
 #include <ClientMessage.h>
 #include <QpidError.h>
 #include <iostream>
+#include <sstream>
 #include <MethodBodyInstances.h>
 
-using namespace qpid::client;
 using namespace qpid::framing;
 using namespace qpid::sys;
 using namespace qpid::sys;
 
-u_int16_t Connection::channelIdCounter;
+
+namespace qpid {
+namespace client {
+
+ChannelId Connection::channelIdCounter;
+
+const std::string Connection::OK("OK");
 
 Connection::Connection(
     bool debug, u_int32_t _max_frame_size,
-    qpid::framing::ProtocolVersion* _version
+    const framing::ProtocolVersion& _version
 ) : max_frame_size(_max_frame_size), closed(true),
-    version(_version->getMajor(),_version->getMinor())
+    version(_version)
 {
-    connector = new Connector(
-        version, requester, responder, debug, _max_frame_size);
+    connector = new Connector(version, debug, _max_frame_size);
 }
 
 Connection::~Connection(){
     delete connector;
 }
 
-void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){
+void Connection::open(
+    const std::string& _host, int _port, const std::string& uid,
+    const std::string& pwd, const std::string& virtualhost)
+{
+
     host = _host;
     port = _port;
     connector->setInputHandler(this);
@@ -55,197 +66,69 @@
     out = connector->getOutputHandler();
     connector->connect(host, port);
     
-    ProtocolInitiation* header = new ProtocolInitiation(version);
-    responses.expect();
-    connector->init(header);
-    responses.receive(method_bodies.connection_start);
-
-    FieldTable props;
-    string mechanism("PLAIN");
-    string response = ((char)0) + uid + ((char)0) + pwd;
-    string locale("en_US");
-    responses.expect();
-    out->send(new AMQFrame(version, 0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));
-
-    /**
-     * Assume for now that further challenges will not be required
-    //receive connection.secure
-    responses.receive(connection_secure));
-    //send connection.secure-ok
-    out->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
-    **/
-
-    responses.receive(method_bodies.connection_tune);
-
-    ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
-    out->send(new AMQFrame(version, 0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
-
-    u_int16_t heartbeat = proposal->getHeartbeat();
-    connector->setReadTimeout(heartbeat * 2);
-    connector->setWriteTimeout(heartbeat);
-
-    //send connection.open
-    string capabilities;
-    string vhost = virtualhost;
-    responses.expect();
-    out->send(new AMQFrame(version, 0, new ConnectionOpenBody(version, vhost, capabilities, true)));
-    //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
-    responses.waitForResponse();
-    if(responses.validate(method_bodies.connection_open_ok)){
-        //ok
-    }else if(responses.validate(method_bodies.connection_redirect)){
-        //ignore for now
-        ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse()));
-        std::cout << "Received redirection to " << redirect->getHost() << std::endl;
-    }else{
-        THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
-    }
-    
+    // Open the special channel 0.
+    channels[0] = &channel0;
+    channel0.open(0, *this);
+    channel0.protocolInit(uid, pwd, virtualhost);
 }
 
-void Connection::close(){
-    if(!closed){
-        u_int16_t code(200);
-        string text("Ok");
-        u_int16_t classId(0);
-        u_int16_t methodId(0);
-        
-        sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
+void Connection::close(
+    ReplyCode code, const string& msg, ClassId classId, MethodId methodId
+)
+{
+    if(!closed) {
+        channel0.sendAndReceive<ConnectionCloseOkBody>(
+            new ConnectionCloseBody(
+                getVersion(), code, msg, classId, methodId));
         connector->close();
     }
 }
 
-void Connection::openChannel(Channel* channel){
-    channel->con = this;
-    channel->id = ++channelIdCounter;
-    channel->out = out;
-    channels[channel->id] = channel;
-    //now send frame to open channel and wait for response
-    string oob;
-    channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
-    channel->setQos();
-    channel->closed = false;
-}
-
-void Connection::closeChannel(Channel* channel){
-    //send frame to close channel
-    u_int16_t code(200);
-    string text("Ok");
-    u_int16_t classId(0);
-    u_int16_t methodId(0);
-    closeChannel(channel, code, text, classId, methodId);
-}
-
-void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){
-    //send frame to close channel
-    channel->cancelAll();
-    channel->closed = true;
-    channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
-    channel->con = 0;
-    channel->out = 0;
-    removeChannel(channel);
-}
-
-void Connection::removeChannel(Channel* channel){
-    //send frame to close channel
-
-    channels.erase(channel->id);
-    channel->out = 0;    
-    channel->id = 0;
-    channel->con = 0;
+// FIXME aconway 2007-01-26: make channels owned and created by connection?
+void Connection::openChannel(Channel& channel) {
+    ChannelId id = ++channelIdCounter;
+    assert (channels.find(id) == channels.end());
+    assert(out);
+    channels[id] = &channel;
+    channel.open(id, *this);
 }
 
-void Connection::received(AMQFrame* frame){
-    AMQBody::shared_ptr body = frame->getBody();
-    u_int8_t type = body->type();
-    if (type == REQUEST_BODY) 
-        responder.received(AMQRequestBody::getData(body));
-    handleFrame(frame);
-    if (type == RESPONSE_BODY)
-        requester.processed(AMQResponseBody::getData(body));
-}
-
-void Connection::handleFrame(AMQFrame* frame){
-    u_int16_t channelId = frame->getChannel();
-
-    if(channelId == 0){
-        this->handleBody(frame->getBody());
-    }else{
-        Channel* channel = channels[channelId];
-        if(channel == 0){
-            error(504, "Unknown channel");
-        }else{
-            try{
-                channel->handleBody(frame->getBody());
-            }catch(qpid::QpidError e){
-                channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
-            }
-        }
-    }
+void Connection::erase(ChannelId id) {
+    channels.erase(id);
 }
 
-void Connection::handleRequest(AMQRequestBody::shared_ptr body) {
-    // FIXME aconway 2007-01-19: request/response handling.
-    handleMethod(body);
-}
-
-void Connection::handleResponse(AMQResponseBody::shared_ptr body) {
-    // FIXME aconway 2007-01-19: request/response handling.
-    handleMethod(body);
-}
-
-void Connection::handleMethod(AMQMethodBody::shared_ptr body){
-    //connection.close, basic.deliver, basic.return or a response to a synchronous request
-    if(responses.isWaiting()){
-        responses.signalResponse(body);
-    }else if(method_bodies.connection_close.match(body.get())){
-        //send back close ok
-        //close socket
-        ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get());
-        std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl;
-        connector->close();
-    }else{
-        std::cout << "Unhandled method for connection: " << *body << std::endl;
-        error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId());
+void Connection::received(AMQFrame* frame){
+    // FIXME aconway 2007-01-25: Mutex 
+    ChannelId id = frame->getChannel();
+    Channel* channel = channels[id];
+    // FIXME aconway 2007-01-26: Exception thrown here is hanging the
+    // client. Need to review use of exceptions.
+    if (channel == 0)
+        THROW_QPID_ERROR(
+            PROTOCOL_ERROR+504,
+            (boost::format("Invalid channel number %g") % id).str());
+    try{
+        channel->handleBody(frame->getBody());
+    }catch(const qpid::QpidError& e){
+        channelException(
+            *channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
     }
 }
-    
-void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){
-    error(504, "Channel error: received header body with channel 0.");
-}
-    
-void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){
-    error(504, "Channel error: received content body with channel 0.");
-}
-    
-void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
-}
 
-void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
-    responses.expect();
+void Connection::send(AMQFrame* frame) {
     out->send(frame);
-    responses.receive(body);
 }
 
-void Connection::error(int code, const string& msg, int classid, int methodid){
-    std::cout << "Connection exception generated: " << code << msg;
-    if(classid || methodid){
-        std::cout << " [" << methodid << ":" << classid << "]";
-    }
-    std::cout << std::endl;
-    sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
-    connector->close();
-}
-
-void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){
-    std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl;
-    int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500;
+void Connection::channelException(
+    Channel& channel, AMQMethodBody* method, const QpidError& e)
+{
+    int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500;
     string msg = e.msg;
-    if(method == 0){
-        closeChannel(channel, code, msg);
-    }else{
-        closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId());
-    }
+    if(method == 0)
+        channel.close(code, msg);
+    else
+        channel.close(
+            code, msg, method->amqpClassId(), method->amqpMethodId());
 }
 
 void Connection::idleIn(){
@@ -259,9 +142,12 @@
 
 void Connection::shutdown(){
     closed = true;
-    //close all channels
-    for(iterator i = channels.begin(); i != channels.end(); i++){
-        i->second->stop();
+    //close all channels, also removes them from the map.
+    while(!channels.empty()){
+        Channel* channel = channels.begin()->second;
+        if (channel != 0)
+            channel->close();
     }
-    responses.signalResponse(AMQMethodBody::shared_ptr());
 }
+
+}} // namespace qpid::client

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h Mon Jan 29 08:13:24 2007
@@ -1,3 +1,6 @@
+#ifndef _Connection_
+#define _Connection_
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,15 +23,15 @@
  */
 #include <map>
 #include <string>
+#include <boost/shared_ptr.hpp>
 
-#ifndef _Connection_
-#define _Connection_
-
+#include "amqp_types.h"
 #include <QpidError.h>
 #include <Connector.h>
 #include <sys/ShutdownHandler.h>
 #include <sys/TimeoutHandler.h>
 
+#include "framing/amqp_types.h"
 #include <framing/amqp_framing.h>
 #include <ClientExchange.h>
 #include <IncomingMessage.h>
@@ -37,150 +40,152 @@
 #include <ClientQueue.h>
 #include <ResponseHandler.h>
 #include <AMQP_HighestVersion.h>
-#include "Requester.h"
-#include "Responder.h"
+#include "ClientChannel.h"
 
 namespace qpid {
 
+/**
+ * The client namespace contains all classes that make up a client
+ * implementation of the AMQP protocol. The key classes that form
+ * the basis of the client API to be used by applications are
+ * Connection and Channel.
+ */
+namespace client {
+
+class Channel;
+
+/**
+ * \internal provide access to selected private channel functions
+ * for the Connection without making it a friend of the entire channel.
+ */
+class ConnectionForChannel :
+        public framing::InputHandler,
+        public framing::OutputHandler,
+        public sys::TimeoutHandler, 
+        public sys::ShutdownHandler
+        
+{
+  private:
+  friend class Channel;
+    virtual void erase(framing::ChannelId) = 0;
+};
+
+
+/**
+ * \defgroup clientapi Application API for an AMQP client
+ */
+
+/**
+ * Represents a connection to an AMQP broker. All communication is
+ * initiated by establishing a connection, then opening one or
+ * more Channels over that connection.
+ * 
+ * \ingroup clientapi
+ */
+class Connection : public ConnectionForChannel
+{
+    typedef std::map<framing::ChannelId, Channel*> ChannelMap;
+
+    static framing::ChannelId channelIdCounter;
+    static const std::string OK;
+        
+    std::string host;
+    int port;
+    const u_int32_t max_frame_size;
+    ChannelMap channels; 
+    Connector* connector;
+    framing::OutputHandler* out;
+    volatile bool closed;
+    framing::ProtocolVersion version;
+        
+    void erase(framing::ChannelId);
+    void channelException(
+        Channel&, framing::AMQMethodBody*, const QpidError&);
+    Channel channel0;
+
+    // TODO aconway 2007-01-26: too many friendships, untagle these classes.
+  friend class Channel;
+    
+  public:
+    const framing::ProtocolVersion& getVersion() const { return version; }
+        
+    /**
+     * Creates a connection object, but does not open the
+     * connection.  
+     * 
+     * @param _version the version of the protocol to connect with
+     *
+     * @param debug turns on tracing for the connection
+     * (i.e. prints details of the frames sent and received to std
+     * out). Optional and defaults to false.
+     * 
+     * @param max_frame_size the maximum frame size that the
+     * client will accept. Optional and defaults to 65536.
+     */
+    Connection(
+        bool debug = false, u_int32_t max_frame_size = 65536, 
+        const framing::ProtocolVersion& = framing::highestProtocolVersion);
+    ~Connection();
+
     /**
-     * The client namespace contains all classes that make up a client
-     * implementation of the AMQP protocol. The key classes that form
-     * the basis of the client API to be used by applications are
-     * Connection and Channel.
+     * Opens a connection to a broker.
+     * 
+     * @param host the host on which the broker is running
+     * 
+     * @param port the port on the which the broker is listening
+     * 
+     * @param uid the userid to connect with
+     * 
+     * @param pwd the password to connect with (currently SASL
+     * PLAIN is the only authentication method supported so this
+     * is sent in clear text)
+     * 
+     * @param virtualhost the AMQP virtual host to use (virtual
+     * hosts, where implemented(!), provide namespace partitioning
+     * within a single broker).
      */
-namespace client {
+    void open(const std::string& host, int port = 5672, 
+              const std::string& uid = "guest", const std::string& pwd = "guest", 
+              const std::string& virtualhost = "/");
 
-    class Channel;
 
     /**
-     * \defgroup clientapi Application API for an AMQP client
+     * Close the connection with optional error information for the peer.
+     *
+     * Any further use of this connection (without reopening it) will
+     * not succeed.
      */
+    void close(framing::ReplyCode=200, const std::string& msg=OK,
+               framing::ClassId = 0, framing::MethodId = 0);
 
     /**
-     * Represents a connection to an AMQP broker. All communication is
-     * initiated by establishing a connection, then opening one or
-     * more Channels over that connection.
+     * Associate a Channel with this connection and open it for use.
+     *
+     * In AMQP channels are like multi-plexed 'sessions' of work over
+     * a connection. Almost all the interaction with AMQP is done over
+     * a channel.
      * 
-     * \ingroup clientapi
+     * @param connection the connection object to be associated with
+     * the channel. Call Channel::close() to close the channel.
+     */
+    void openChannel(Channel&);
+
+
+    // TODO aconway 2007-01-26: can these be private?
+    void send(framing::AMQFrame*);
+    void received(framing::AMQFrame*);
+    void idleOut();
+    void idleIn();
+    void shutdown();
+
+    /**
+     * @return the maximum frame size in use on this connection
      */
-    class Connection : public virtual qpid::framing::InputHandler, 
-        public virtual qpid::sys::TimeoutHandler, 
-        public virtual qpid::sys::ShutdownHandler, 
-        private virtual qpid::framing::BodyHandler
-    {
-
-        typedef std::map<int, Channel*>::iterator iterator;
-
-	static u_int16_t channelIdCounter;
-
-	std::string host;
-	int port;
-	const u_int32_t max_frame_size;
-	std::map<int, Channel*> channels; 
-	Connector* connector;
-	qpid::framing::OutputHandler* out;
-	ResponseHandler responses;
-        volatile bool closed;
-        framing::ProtocolVersion version;
-        framing::Requester requester;
-        framing::Responder responder;
-
-        void channelException(Channel* channel, framing::AMQMethodBody* body, QpidError& e);
-        void error(int code, const std::string& msg, int classid = 0, int methodid = 0);
-        void closeChannel(Channel* channel, u_int16_t code, std::string& text, u_int16_t classId = 0, u_int16_t methodId = 0);
-	void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body);
-
-        // FIXME aconway 2007-01-19: Use channel(0) not connection
-        // to handle channel 0 requests. Remove handler methods.
-        //
-        void handleRequest(framing::AMQRequestBody::shared_ptr);
-        void handleResponse(framing::AMQResponseBody::shared_ptr);
-        void handleMethod(framing::AMQMethodBody::shared_ptr);
-	void handleHeader(framing::AMQHeaderBody::shared_ptr);
-	void handleContent(framing::AMQContentBody::shared_ptr);
-	void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr);
-        void handleFrame(framing::AMQFrame* frame);
-
-    public:
-        /**
-         * Creates a connection object, but does not open the
-         * connection.  
-         * 
-         * @param _version the version of the protocol to connect with
-	 *
-         * @param debug turns on tracing for the connection
-         * (i.e. prints details of the frames sent and received to std
-         * out). Optional and defaults to false.
-         * 
-         * @param max_frame_size the maximum frame size that the
-         * client will accept. Optional and defaults to 65536.
-         */
-	Connection( bool debug = false, u_int32_t max_frame_size = 65536, 
-			framing::ProtocolVersion* _version = &(framing::highestProtocolVersion));
-	~Connection();
-
-        /**
-         * Opens a connection to a broker.
-         * 
-         * @param host the host on which the broker is running
-         * 
-         * @param port the port on the which the broker is listening
-         * 
-         * @param uid the userid to connect with
-         * 
-         * @param pwd the password to connect with (currently SASL
-         * PLAIN is the only authentication method supported so this
-         * is sent in clear text)
-         * 
-         * @param virtualhost the AMQP virtual host to use (virtual
-         * hosts, where implemented(!), provide namespace partitioning
-         * within a single broker).
-         */
-        void open(const std::string& host, int port = 5672, 
-                  const std::string& uid = "guest", const std::string& pwd = "guest", 
-                  const std::string& virtualhost = "/");
-        /**
-         * Closes the connection. Any further use of this connection
-         * (without reopening it) will not succeed.
-         */
-        void close();
-        /**
-         * Opens a Channel. In AMQP channels are like multi-plexed
-         * 'sessions' of work over a connection. Almost all the
-         * interaction with AMQP is done over a channel.
-         * 
-         * @param channel a pointer to a channel instance that will be
-         * used to represent the new channel.
-         */
-	void openChannel(Channel* channel);
-	/*
-         * Requests that the server close this channel, then removes
-         * the association to the channel from this connection
-         *
-         * @param channel a pointer to the channel instance to close
-         */
-	void closeChannel(Channel* channel);
-	/*
-         * Removes the channel from association with this connection,
-	 * without sending a close request to the server.
-         *
-         * @param channel a pointer to the channel instance to
-         * disassociate
-         */
-	void removeChannel(Channel* channel);
-
-	virtual void received(framing::AMQFrame* frame);
-
-	virtual void idleOut();
-	virtual void idleIn();
-
-	virtual void shutdown();
-
-        /**
-         * @return the maximum frame size in use on this connection
-         */
-	inline u_int32_t getMaxFrameSize(){ return max_frame_size; }
-    };
+    inline u_int32_t getMaxFrameSize(){ return max_frame_size; }
+
+    /** @return protocol version in use on this connection. */ 
+    const framing::ProtocolVersion& getVersion() { return version; }
+};
 
 }
 }