You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/08/02 20:09:51 UTC

svn commit: r562212 [1/2] - in /incubator/qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/client/ cpp/src/tests/ python/qpid/

Author: gsim
Date: Thu Aug  2 11:09:48 2007
New Revision: 562212

URL: http://svn.apache.org/viewvc?view=rev&rev=562212
Log:
Some restructuring of the client code: 
 * Introduced three separate 'handlers' for the connection, channel and execution 'layers'.
 * Support for asynchronous retrieval of response or completion status.
 * Channel methods no longer included in execution layers command id count.


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChainableFrameHandler.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.h   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/IncomingMessage.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/IncomingMessage.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/tests/   (props changed)
    incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp
    incubator/qpid/trunk/qpid/python/qpid/message.py
    incubator/qpid/trunk/qpid/python/qpid/peer.py
    incubator/qpid/trunk/qpid/python/qpid/spec.py

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Aug  2 11:09:48 2007
@@ -229,12 +229,21 @@
   qpid/client/ClientChannel.cpp			\
   qpid/client/ClientExchange.cpp		\
   qpid/client/ClientQueue.cpp			\
-  qpid/client/BasicMessageChannel.cpp		\
   qpid/client/Connector.cpp			\
-  qpid/client/IncomingMessage.cpp		\
   qpid/client/MessageListener.cpp		\
   qpid/client/ResponseHandler.cpp		\
-  qpid/client/ReturnedMessageHandler.cpp
+  qpid/client/ReturnedMessageHandler.cpp	\
+  qpid/client/Correlator.cpp			\
+  qpid/client/CompletionTracker.cpp		\
+  qpid/client/ChannelHandler.cpp		\
+  qpid/client/ConnectionHandler.cpp		\
+  qpid/client/ExecutionHandler.cpp		\
+  qpid/client/FutureCompletion.cpp		\
+  qpid/client/FutureResponse.cpp		\
+  qpid/client/FutureFactory.cpp			\
+  qpid/client/ReceivedContent.cpp		\
+  qpid/client/StateManager.cpp
+
 
 nobase_include_HEADERS = \
   $(platform_hdr) \
@@ -306,19 +315,29 @@
   qpid/broker/TransactionalStore.h \
   qpid/broker/TxAck.h \
   qpid/client/AckMode.h \
-  qpid/client/BasicMessageChannel.h \
   qpid/client/ClientChannel.h \
   qpid/client/ClientExchange.h \
   qpid/client/ClientMessage.h \
   qpid/client/ClientQueue.h \
   qpid/client/Connection.h \
   qpid/client/Connector.h \
-  qpid/client/IncomingMessage.h \
   qpid/client/MessageChannel.h \
   qpid/client/MessageListener.h \
   qpid/client/MethodBodyInstances.h \
   qpid/client/ResponseHandler.h \
   qpid/client/ReturnedMessageHandler.h \
+  qpid/client/BlockingQueue.h \
+  qpid/client/Correlator.h \
+  qpid/client/CompletionTracker.h \
+  qpid/client/ChannelHandler.h \
+  qpid/client/ChainableFrameHandler.h	\
+  qpid/client/ConnectionHandler.h \
+  qpid/client/ExecutionHandler.h \
+  qpid/client/FutureCompletion.h \
+  qpid/client/FutureResponse.h \
+  qpid/client/FutureFactory.h \
+  qpid/client/ReceivedContent.h \
+  qpid/client/StateManager.h \
   qpid/framing/AMQBody.h \
   qpid/framing/AMQContentBody.h \
   qpid/framing/AMQDataBlock.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Thu Aug  2 11:09:48 2007
@@ -68,7 +68,11 @@
         handleL4(method, context);
         //(if the frameset is complete) we can move the execution-mark
         //forward 
-        ++(incoming.hwm);
+
+        //temporary hack until channel management is moved to its own handler:
+        if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
+            ++(incoming.hwm);
+        }
 
         //note: need to be more sophisticated than this if we execute
         //commands that arrive within an active message frameset (that
@@ -175,8 +179,11 @@
     Mutex::ScopedLock l(outLock);
     uint8_t type(body->type());
     if (type == REQUEST_BODY || type == RESPONSE_BODY || type == METHOD_BODY) {
-        ++outgoing.hwm;
-        //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body  << std::endl;
+        //temporary hack until channel management is moved to its own handler:
+        if (dynamic_pointer_cast<AMQMethodBody>(body)->amqpClassId() != ChannelOpenBody::CLASS_ID) {
+            ++outgoing.hwm;
+            //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body  << std::endl;
+        }
     }
     return ChannelAdapter::send(body, action);
 }

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h Thu Aug  2 11:09:48 2007
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _BlockingQueue_
+#define _BlockingQueue_
+
+#include <queue>
+#include "qpid/sys/Monitor.h"
+
+namespace qpid {
+namespace client {
+
+struct QueueClosed {};
+
+template <class T>
+class BlockingQueue 
+{
+    sys::Monitor lock;
+    std::queue<T> queue;
+    bool closed;
+ 
+public:
+
+    BlockingQueue() : closed(false) {}
+
+    void reset() 
+    {
+        sys::Monitor::ScopedLock l(lock);
+        closed = true; 
+    }
+
+    T pop()
+    {
+        sys::Monitor::ScopedLock l(lock);
+        while (!closed && queue.empty()) {
+            lock.wait();
+        } 
+        if (closed) {
+            throw QueueClosed();
+        } else {
+            T t = queue.front();
+            queue.pop();
+            return t;
+        }
+    }
+
+    void push(T t)
+    {
+        sys::Monitor::ScopedLock l(lock);
+        bool wasEmpty = queue.empty();
+        queue.push(t);
+        if (wasEmpty) {
+            lock.notifyAll();
+        }
+    }
+
+    void close()
+    {
+        sys::Monitor::ScopedLock l(lock);
+        closed = true;
+        lock.notifyAll();
+    }
+};
+
+}}
+
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChainableFrameHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChainableFrameHandler.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChainableFrameHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChainableFrameHandler.h Thu Aug  2 11:09:48 2007
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _ChainableFrameHandler_
+#define _ChainableFrameHandler_
+
+#include <boost/function.hpp>
+#include "qpid/framing/AMQFrame.h"
+
+namespace qpid {
+namespace client {
+
+struct ChainableFrameHandler
+{
+    typedef boost::function<void(framing::AMQFrame&)> FrameDelegate;
+
+    FrameDelegate in;
+    FrameDelegate out;
+
+    ChainableFrameHandler() {}
+    ChainableFrameHandler(FrameDelegate i, FrameDelegate o): in(i), out(o) {}
+    virtual ~ChainableFrameHandler() {}
+};
+
+}}
+
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChainableFrameHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChainableFrameHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp Thu Aug  2 11:09:48 2007
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ChannelHandler.h"
+#include "qpid/framing/amqp_framing.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace boost;
+
+ChannelHandler::ChannelHandler() : StateManager(CLOSED), id(0) {}
+
+void ChannelHandler::incoming(AMQFrame& frame)
+{
+    AMQBody::shared_ptr body = frame.getBody();
+    if (getState() == OPEN) {
+        if (isA<ChannelCloseBody>(body)) {
+            ChannelCloseBody::shared_ptr method(shared_polymorphic_cast<ChannelCloseBody>(body));
+            setState(CLOSED);
+            if (onClose) {
+                onClose(method->getReplyCode(), method->getReplyText());
+            }
+        } else {
+            try {
+                in(frame);
+            }catch(ChannelException& e){
+                if (body->type() == METHOD_BODY) {
+                    AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body));
+                    close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+                } else {
+                    close(e.code, e.toString(), 0, 0);
+                }
+            }
+        }
+    } else {
+        if (body->type() == METHOD_BODY) {
+            handleMethod(shared_polymorphic_cast<AMQMethodBody>(body));
+        } else {
+            throw new ConnectionException(504, "Channel not open.");
+        }
+
+    }
+}
+
+void ChannelHandler::outgoing(AMQFrame& frame)
+{
+    if (getState() == OPEN) {
+        frame.channel = id;
+        out(frame);
+    } else {
+        throw Exception("Channel not open");
+    }
+}
+
+void ChannelHandler::open(uint16_t _id)
+{
+    id = _id;
+
+    setState(OPENING);
+    AMQFrame f(version, id, make_shared_ptr(new ChannelOpenBody(version)));
+    out(f);
+
+    std::set<int> states;
+    states.insert(OPEN);
+    states.insert(CLOSED);
+    waitFor(states);
+    if (getState() != OPEN) {
+        throw Exception("Failed to open channel.");
+    }
+}
+
+void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
+{
+    setState(CLOSING);
+    AMQFrame f(version, id, make_shared_ptr(new ChannelCloseBody(version, code, message, classId, methodId)));
+    out(f);
+}
+
+void ChannelHandler::close()
+{
+    close(200, "OK", 0, 0);
+    waitFor(CLOSED);
+}
+
+void ChannelHandler::handleMethod(AMQMethodBody::shared_ptr method)
+{
+    switch (getState()) {
+    case OPENING:
+        if (method->isA<ChannelOpenOkBody>()) {
+            setState(OPEN);
+        } else {
+            throw ConnectionException(504, "Channel not opened.");
+        }
+        break;
+    case CLOSING:
+        if (method->isA<ChannelCloseOkBody>()) {
+            setState(CLOSED);
+        } //else just ignore it
+        break;
+    case CLOSED:
+        throw ConnectionException(504, "Channel not opened.");
+    default:
+        throw Exception("Unexpected state encountered in ChannelHandler!");
+    }
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h Thu Aug  2 11:09:48 2007
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ChannelHandler_
+#define _ChannelHandler_
+
+#include "StateManager.h"
+#include "ChainableFrameHandler.h"
+#include "qpid/framing/amqp_framing.h"
+
+namespace qpid {
+namespace client {
+
+class ChannelHandler : private StateManager, public ChainableFrameHandler
+{
+    enum STATES {OPENING, OPEN, CLOSING, CLOSED};
+    framing::ProtocolVersion version;
+    uint16_t id;
+
+    void handleMethod(framing::AMQMethodBody::shared_ptr method);
+
+    template <class T> bool isA(framing::AMQBody::shared_ptr body) {
+        return body->type() == framing::METHOD_BODY && 
+            boost::shared_polymorphic_cast<framing::AMQMethodBody>(body)->isA<T>();
+    }
+
+
+    void close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId);
+
+
+public:
+    typedef boost::function<void(uint16_t, const std::string&)> CloseListener;    
+
+    ChannelHandler();
+
+    void incoming(framing::AMQFrame& frame);
+    void outgoing(framing::AMQFrame& frame);
+
+    void open(uint16_t id);
+    void close();
+
+    CloseListener onClose;
+};
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Thu Aug  2 11:09:48 2007
@@ -24,9 +24,12 @@
 #include "qpid/sys/Monitor.h"
 #include "ClientMessage.h"
 #include "qpid/QpidError.h"
-#include "MethodBodyInstances.h"
 #include "Connection.h"
-#include "BasicMessageChannel.h"
+#include "ConnectionHandler.h"
+#include "FutureResponse.h"
+#include "MessageListener.h"
+#include <boost/format.hpp>
+#include <boost/bind.hpp>
 
 // FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
 // handling of errors that should close the connection or the channel.
@@ -45,18 +48,13 @@
 
 }}
 
-Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
+Channel::Channel(bool _transactional, u_int16_t _prefetch) :
     connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false)
 {
-    switch (mode) {
-      case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break;
-      default: assert(0); QPID_ERROR(INTERNAL_ERROR, "Invalid interop-mode.");
-    }
 }
 
 Channel::~Channel(){
     closeInternal();
-    stop();
 }
 
 void Channel::open(ChannelId id, Connection& con)
@@ -64,65 +62,15 @@
     if (isOpen())
         THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id);
     connection = &con;
-    init(id, con, con.getVersion()); // ChannelAdapter initialization.
-    string oob;
-    if (id != 0) 
-        sendAndReceive<ChannelOpenOkBody>(make_shared_ptr(new ChannelOpenBody(version, oob)));
-}
-
-void Channel::protocolInit(
-    const std::string& uid, const std::string& pwd, const std::string& vhost) {
-    assert(connection);
-    responses.expect();
-    connection->connector->init(); // Send ProtocolInit block.
-    ConnectionStartBody::shared_ptr connectionStart =
-        responses.receive<ConnectionStartBody>();
-
-    FieldTable props;
-    string mechanism("PLAIN");
-    string response = ((char)0) + uid + ((char)0) + pwd;
-    string locale("en_US");
-    ConnectionTuneBody::shared_ptr proposal =
-        sendAndReceive<ConnectionTuneBody>(
-            make_shared_ptr(new ConnectionStartOkBody(
-                version, //connectionStart->getRequestId(),
-                props, mechanism,
-                response, locale)));
-
-    /**
-     * Assume for now that further challenges will not be required
-     //receive connection.secure
-     responses.receive(connection_secure));
-     //send connection.secure-ok
-     connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
-    **/
-
-    sendCommand(make_shared_ptr(new ConnectionTuneOkBody(
-             version, //proposal->getRequestId(),
-             proposal->getChannelMax(), connection->getMaxFrameSize(),
-             proposal->getHeartbeat())));
-    
-    uint16_t heartbeat = proposal->getHeartbeat();
-    connection->connector->setReadTimeout(heartbeat * 2);
-    connection->connector->setWriteTimeout(heartbeat);
-
-    // Send connection open.
-    std::string capabilities;
-    responses.expect();
-    sendCommand(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, true)));
-    //receive connection.open-ok (or redirect, but ignore that for now
-    //esp. as using force=true).
-    AMQMethodBody::shared_ptr openResponse = responses.receive();
-    if(openResponse->isA<ConnectionOpenOkBody>()) {
-        //ok
-    }else if(openResponse->isA<ConnectionRedirectBody>()){
-        //ignore for now
-        ConnectionRedirectBody::shared_ptr redirect(
-            shared_polymorphic_downcast<ConnectionRedirectBody>(openResponse));
-        QPID_LOG(error, "Ignoring redirect to " << redirect->getHost());
-    } else {
-        THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response to Connection.open");
-    }
+    channelId = id;
+    //link up handlers:
+    channelHandler.out = boost::bind(&ConnectionHandler::outgoing, &(connection->handler), _1);
+    channelHandler.in = boost::bind(&ExecutionHandler::handle, &executionHandler, _1);
+    executionHandler.out = boost::bind(&ChannelHandler::outgoing, &channelHandler, _1);
+    //set up close notification:
+    channelHandler.onClose = boost::bind(&Channel::peerClose, this, _1, _2);
+
+    channelHandler.open(id);
 }
     
 bool Channel::isOpen() const { 
@@ -131,7 +79,11 @@
 }
 
 void Channel::setQos() {
-    messaging->setQos();
+    executionHandler.send(make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false)));
+    if(isTransactional()) {
+        //I think this is wrong! should only send TxSelect once...
+        executionHandler.send(make_shared_ptr(new TxSelectBody(version)));
+    }
 }
 
 void Channel::setPrefetch(uint16_t _prefetch){
@@ -143,14 +95,12 @@
     string name = exchange.getName();
     string type = exchange.getType();
     FieldTable args;
-    send(make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args)));
-    if (synch) synchWithServer();
+    sendSync(synch, make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args)));
 }
 
 void Channel::deleteExchange(Exchange& exchange, bool synch){
     string name = exchange.getName();
-    send(make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false)));
-    if (synch) synchWithServer();
+    sendSync(synch, make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false)));
 }
 
 void Channel::declareQueue(Queue& queue, bool synch){
@@ -179,131 +129,41 @@
 void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
     string e = exchange.getName();
     string q = queue.getName();
-    send(make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args)));
-    if (synch) synchWithServer();
+    sendSync(synch, make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args)));
 }
 
 void Channel::commit(){
-    send(make_shared_ptr(new TxCommitBody(version)));
+    executionHandler.send(make_shared_ptr(new TxCommitBody(version)));
 }
 
 void Channel::rollback(){
-    send(make_shared_ptr(new TxRollbackBody(version)));
-}
-
-void Channel::handleMethodInContext(
-AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
-{
-    // Special case for consume OK as it is both an expected response
-    // and needs handling in this thread.
-    if (method->isA<BasicConsumeOkBody>()) {
-        messaging->handle(method);
-        responses.signalResponse(method);
-        return;
-    }
-    if(responses.isWaiting()) {
-        responses.signalResponse(method);
-        return;
-    }
-    try {
-        switch (method->amqpClassId()) {
-          case MessageTransferBody::CLASS_ID: 
-          case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
-          case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break;
-          case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
-          case ExecutionCompleteBody::CLASS_ID: handleExecution(method); break;
-          default: throw UnknownMethod();
-        }
-    }
-    catch (const UnknownMethod&) {
-                connection->close(
-                    504, "Unknown method",
-                    method->amqpClassId(), method->amqpMethodId());
-            }
-        }
-
-void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& /*ctxt*/) {
-    switch (method->amqpMethodId()) {
-      case ChannelCloseBody::METHOD_ID:
-          sendCommand(make_shared_ptr(new ChannelCloseOkBody(version/*, ctxt.getRequestId()*/)));
-        peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
-        return;
-      case ChannelFlowBody::METHOD_ID:
-        // FIXME aconway 2007-02-22: Not yet implemented.
-        return;
-    }
-    throw UnknownMethod();
-}
-
-void Channel::handleConnection(AMQMethodBody::shared_ptr method) {
-    if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) {
-        connection->close();
-        return;
-    } 
-    throw UnknownMethod();
-}
-
-void Channel::handleExecution(AMQMethodBody::shared_ptr method) {
-    if (method->amqpMethodId() == ExecutionCompleteBody::METHOD_ID) {
-        Monitor::ScopedLock l(outgoingMonitor);
-        //record the completion mark:
-        outgoing.lwm = shared_polymorphic_downcast<ExecutionCompleteBody>(method)->getCumulativeExecutionMark();
-        //TODO: notify anyone waiting for completion notification:
-        outgoingMonitor.notifyAll();
-    } else{
-        throw UnknownMethod();
-    }
-}
-
-void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
-    messaging->handle(body);
-}
-    
-void Channel::handleContent(AMQContentBody::shared_ptr body){
-    messaging->handle(body);
-}
-
-void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
-    THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
+    executionHandler.send(make_shared_ptr(new TxRollbackBody(version)));
 }
 
-void Channel::start(){
-    running = true;
-    dispatcher = Thread(*messaging);
-}
-
-// Close called by local application.
-void Channel::close(
-    uint16_t code, const std::string& text,
-    ClassId classId, MethodId methodId)
+void Channel::close()
 {
-    if (isOpen()) {
-        try {
-            if (getId() != 0) {
-                if (code == 200) messaging->cancelAll();
-
-                sendAndReceive<ChannelCloseOkBody>(
-                    make_shared_ptr(new ChannelCloseBody(
-                                        version, code, text, classId, methodId)));
-            }
-            static_cast<ConnectionForChannel*>(connection)->erase(getId()); 
-            closeInternal();
-        } catch (...) {
-            static_cast<ConnectionForChannel*>(connection)->erase(getId()); 
-            closeInternal();
-            throw;
+    channelHandler.close();
+    {
+        Mutex::ScopedLock l(lock);
+        if (connection);
+        {
+            connection->erase(channelId);
+            connection = 0;
         }
     }
     stop();
 }
 
+
 // Channel closed by peer.
-void Channel::peerClose(ChannelCloseBody::shared_ptr reason) {
+void Channel::peerClose(uint16_t code, const std::string& message) {
     assert(isOpen());
     //record reason:
-    errorCode = reason->getReplyCode();
-    errorText = reason->getReplyText();
+    errorCode = code;
+    errorText = message;
     closeInternal();
+    stop();
+    futures.close(code, message);
 }
 
 void Channel::closeInternal() {
@@ -311,26 +171,26 @@
     if (connection);
     {
         connection = 0;
-        messaging->close();
-        // A 0 response means we are closed.
-        responses.signalResponse(AMQMethodBody::shared_ptr());
     }
 }
 
-void Channel::stop() {
-    Mutex::ScopedLock l(stopLock);
-    if(running) {
-        dispatcher.join();
-        running = false;
-    }
+AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr toSend, ClassId /*c*/, MethodId /*m*/)
+{
+
+    boost::shared_ptr<FutureResponse> fr(futures.createResponse());
+    executionHandler.send(toSend, boost::bind(&FutureResponse::completed, fr), boost::bind(&FutureResponse::received, fr, _1));
+    return fr->getResponse();
 }
 
-AMQMethodBody::shared_ptr Channel::sendAndReceive(
-    AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m)
+void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command)
 {
-    responses.expect();
-    sendCommand(toSend);
-    return responses.receive(c, m);
+    if(sync) {
+        boost::shared_ptr<FutureCompletion> fc(futures.createCompletion());
+        executionHandler.send(command, boost::bind(&FutureCompletion::completed, fc));
+        fc->waitForCompletion();
+    } else {
+        executionHandler.send(command);
+    }
 }
 
 AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
@@ -339,68 +199,138 @@
     if(sync)
         return sendAndReceive(body, c, m);
     else {
-        sendCommand(body);
+        executionHandler.send(body);
         return AMQMethodBody::shared_ptr();
     }
 }
 
 void Channel::consume(
-    Queue& queue, std::string& tag, MessageListener* listener, 
+    Queue& queue, const std::string& tag, MessageListener* listener, 
     AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
-    messaging->consume(queue, tag, listener, ackMode, noLocal, synch, fields);
+
+    if (tag.empty()) {
+        throw Exception("A tag must be specified for a consumer."); 
+    }
+    {
+        Mutex::ScopedLock l(lock);
+        ConsumerMap::iterator i = consumers.find(tag);
+        if (i != consumers.end())
+            throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag);
+        Consumer& c = consumers[tag];
+        c.listener = listener;
+        c.ackMode = ackMode;
+        c.lastDeliveryTag = 0;
+    }
+    sendAndReceiveSync<BasicConsumeOkBody>(
+            synch,
+            make_shared_ptr(new BasicConsumeBody(
+                version, 0, queue.getName(), tag, noLocal,
+                ackMode == NO_ACK, false, !synch,
+                fields ? *fields : FieldTable())));
 }
         
 void Channel::cancel(const std::string& tag, bool synch) {
-    messaging->cancel(tag, synch);
+    Consumer c;
+    {
+        Mutex::ScopedLock l(lock);
+        ConsumerMap::iterator i = consumers.find(tag);
+        if (i == consumers.end())
+            return;
+        c = i->second;
+        consumers.erase(i);
+    }
+    sendAndReceiveSync<BasicCancelOkBody>(
+        synch, make_shared_ptr(new BasicCancelBody(version, tag, !synch)));
 }
 
 bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
-    bool result = messaging->get(msg, queue, ackMode);
-    if (!isOpen()) {
-        throw ChannelException(errorCode, errorText);
+
+    AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, queue.getName(), ackMode));
+    AMQMethodBody::shared_ptr response = sendAndReceive(request);
+    if (response && response->isA<BasicGetEmptyBody>()) {
+        return false;
+    } else {
+        ReceivedContent::shared_ptr content = gets.pop();
+        content->populate(msg);
+        return true;
     }
-    return result;
 }
 
 void Channel::publish(const Message& msg, const Exchange& exchange,
                       const std::string& routingKey, 
                       bool mandatory, bool immediate) {
-    messaging->publish(msg, exchange, routingKey, mandatory, immediate);
-}
 
-void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler) {
-    messaging->setReturnedMessageHandler(handler);
-}
+    const string e = exchange.getName();
+    string key = routingKey;
 
-void Channel::run() {
-    messaging->run();
+    executionHandler.sendContent(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)), 
+                                 msg, msg.getData(), connection->getMaxFrameSize());//sending framesize here is horrible, fix this!
+    /*
+    // Make a header for the message
+    AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+    BasicHeaderProperties::copy(
+        *static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
+    header->setContentSize(msg.getData().size());
+
+    executionHandler.send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
+    executionHandler.sendContent(header);
+    string data = msg.getData();
+    u_int64_t data_length = data.length();
+    if(data_length > 0){
+        //frame itself uses 8 bytes
+        u_int32_t frag_size = connection->getMaxFrameSize() - 8;
+        if(data_length < frag_size){
+            executionHandler.sendContent(make_shared_ptr(new AMQContentBody(data)));
+        }else{
+            u_int32_t offset = 0;
+            u_int32_t remaining = data_length - offset;
+            while (remaining > 0) {
+                u_int32_t length = remaining > frag_size ? frag_size : remaining;
+                string frag(data.substr(offset, length));
+                executionHandler.sendContent(make_shared_ptr(new AMQContentBody(frag)));                          
+                
+                offset += length;
+                remaining = data_length - offset;
+            }
+        }
+    }
+    */
 }
 
-void Channel::sendCommand(AMQBody::shared_ptr body)
-{
-    ++(outgoing.hwm);
-    send(body);
+void Channel::start(){
+    running = true;
+    dispatcher = Thread(*this);
 }
 
-bool Channel::waitForCompletion(SequenceNumber poi, Duration timeout)
-{
-    AbsTime end;
-    if (timeout == 0) {
-        end = AbsTime::FarFuture();
-    } else {
-        end = AbsTime(AbsTime::now(), timeout);
-    }
-
-    Monitor::ScopedLock l(outgoingMonitor);
-    while (end > AbsTime::now() && outgoing.lwm < poi) {
-        outgoingMonitor.wait(end);
+void Channel::stop() {
+    executionHandler.received.close();
+    gets.close();
+    Mutex::ScopedLock l(stopLock);
+    if(running) {
+        dispatcher.join();
+        running = false;
     }
-    return !(outgoing.lwm < poi);
 }
 
-bool Channel::synchWithServer(Duration timeout) 
-{
-    send(make_shared_ptr(new ExecutionFlushBody(version)));
-    return waitForCompletion(outgoing.hwm, timeout);
+void Channel::run() {
+    try {
+        while (true) {
+            ReceivedContent::shared_ptr content = executionHandler.received.pop();
+            //need to dispatch this to the relevant listener:
+            if (content->isA<BasicDeliverBody>()) {
+                ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
+                if (i != consumers.end()) {
+                    Message msg;
+                    content->populate(msg);
+                    i->second.listener->received(msg);
+                } else {
+                    QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod());                        
+                }               
+            } else if (content->isA<BasicGetOkBody>()) {
+                gets.push(content);
+            } else {
+                QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod());                        
+            }
+        }
+    } catch (const QueueClosed&) {}
 }
-

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Thu Aug  2 11:09:48 2007
@@ -26,10 +26,12 @@
 #include "ClientExchange.h"
 #include "ClientMessage.h"
 #include "ClientQueue.h"
-#include "ResponseHandler.h"
+#include "ChannelHandler.h"
+#include "ExecutionHandler.h"
+#include "FutureFactory.h"
 #include "qpid/Exception.h"
-#include "qpid/framing/ChannelAdapter.h"
-#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
 #include "AckMode.h"
 
@@ -54,19 +56,23 @@
  * 
  * \ingroup clientapi
  */
-class Channel : public framing::ChannelAdapter
+class Channel : private sys::Runnable
 {
   private:
     struct UnknownMethod {};
     typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
+
+    struct Consumer{
+        MessageListener* listener;
+        AckMode ackMode;
+        int count;
+        u_int64_t lastDeliveryTag;
+    };
+    typedef std::map<std::string, Consumer> ConsumerMap;
         
     mutable sys::Mutex lock;
-    boost::scoped_ptr<MessageChannel> messaging;
     Connection* connection;
     sys::Thread dispatcher;
-    ResponseHandler responses;
-    sys::Monitor outgoingMonitor;
-    framing::Window outgoing;
 
     uint16_t prefetch;
     const bool transactional;
@@ -78,32 +84,29 @@
     sys::Mutex stopLock;
     bool running;
 
-    void stop();
+    ConsumerMap consumers;
+    ExecutionHandler executionHandler;
+    ChannelHandler channelHandler;
+    framing::ChannelId channelId;
+    BlockingQueue<ReceivedContent::shared_ptr> gets;
+    FutureFactory futures;
 
-    void handleHeader(framing::AMQHeaderBody::shared_ptr body);
-    void handleContent(framing::AMQContentBody::shared_ptr body);
-    void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
-    void handleMethodInContext(
-        framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
-    void handleChannel(framing::AMQMethodBody::shared_ptr method, const framing::MethodContext& ctxt);
-    void handleConnection(framing::AMQMethodBody::shared_ptr method);
-    void handleExecution(framing::AMQMethodBody::shared_ptr method);
+    void stop();
 
     void setQos();
-
-    void protocolInit(
-        const std::string& uid, const std::string& pwd,
-        const std::string& vhost);
     
     framing::AMQMethodBody::shared_ptr sendAndReceive(
         framing::AMQMethodBody::shared_ptr,
-        framing::ClassId, framing::MethodId);
+        framing::ClassId = 0, framing::MethodId = 0);
 
     framing::AMQMethodBody::shared_ptr sendAndReceiveSync(
         bool sync,
         framing::AMQMethodBody::shared_ptr,
         framing::ClassId, framing::MethodId);
 
+    void sendSync(bool sync, framing::AMQMethodBody::shared_ptr body);
+
+
     template <class BodyType>
     boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) {
         return boost::shared_polymorphic_downcast<BodyType>(
@@ -118,21 +121,16 @@
                 sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
     }
 
-    void sendCommand(framing::AMQBody::shared_ptr body);
-
     void open(framing::ChannelId, Connection&);
     void closeInternal();
-    void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
-    bool waitForCompletion(framing::SequenceNumber, sys::Duration);
-    
+    void peerClose(uint16_t, const std::string&);
+
     // FIXME aconway 2007-02-23: Get rid of friendships.
-  friend class Connection;
-  friend class BasicMessageChannel; // for sendAndReceive.
-  friend class MessageMessageChannel; // for sendAndReceive.
+    friend class Connection;
+    friend class BasicMessageChannel; // for sendAndReceive.
+    friend class MessageMessageChannel; // for sendAndReceive.
         
   public:
-    enum InteropMode { AMQP_08, AMQP_09 };
-
     /**
      * Creates a channel object.
      * 
@@ -143,16 +141,10 @@
      * @param prefetch specifies the number of unacknowledged
      * messages the channel is willing to have sent to it
      * asynchronously
-     *
-     * @param messageImpl Alternate messaging implementation class to
-     * allow alternate protocol implementations of messaging
-     * operations. Takes ownership.
-     */
-    Channel(
-        bool transactional = false, u_int16_t prefetch = 500,
-        InteropMode=AMQP_08);
+     */
+    Channel(bool transactional = false, u_int16_t prefetch = 500);
      
-    ~Channel();
+    ~Channel();    
 
     /**
      * Declares an exchange.
@@ -254,12 +246,10 @@
     void start();
 
     /**
-     * Close the channel with optional error information.
-     * Closing a channel that is not open has no effect.
+     * Close the channel. Closing a channel that is not open has no
+     * effect.
      */
-    void close(
-        framing::ReplyCode = 200, const std::string& ="OK",
-        framing::ClassId = 0, framing::MethodId  = 0);
+    void close();
 
     /** True if the channel is transactional */
     bool isTransactional() { return transactional; }
@@ -301,7 +291,7 @@
      * is received from the broker
      */
     void consume(
-        Queue& queue, std::string& tag, MessageListener* listener, 
+        Queue& queue, const std::string& tag, MessageListener* listener, 
         AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
         const framing::FieldTable* fields = 0);
         
@@ -353,22 +343,9 @@
                  bool mandatory = false, bool immediate = false);
 
     /**
-     * Set a handler for this channel that will process any
-     * returned messages
-     * 
-     * @see publish()
-     */
-    void setReturnedMessageHandler(ReturnedMessageHandler* handler);
-
-    /**
-     * Deliver messages from the broker to the appropriate MessageListener. 
+     * Deliver incoming messages to the appropriate MessageListener. 
      */
     void run();
-
-    /**
-     * TESTING ONLY FOR NOW!
-     */
-    bool synchWithServer(sys::Duration timeout = 0);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp Thu Aug  2 11:09:48 2007
@@ -46,11 +46,13 @@
 Connection::Connection(
     bool _debug, uint32_t _max_frame_size,
     framing::ProtocolVersion _version
-) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size),
+    ) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size),
     defaultConnector(version, _debug, _max_frame_size),
     isOpen(false), debug(_debug)
 {
     setConnector(defaultConnector);
+
+    handler.maxFrameSize = _max_frame_size;
 }
 
 Connection::~Connection(){}
@@ -58,7 +60,7 @@
 void Connection::setConnector(Connector& con)
 {
     connector = &con;
-    connector->setInputHandler(this);
+    connector->setInputHandler(&handler);
     connector->setTimeoutHandler(this);
     connector->setShutdownHandler(this);
     out = connector->getOutputHandler();
@@ -70,10 +72,19 @@
 {
     if (isOpen)
         THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
+
+    //wire up the handler:
+    handler.in = boost::bind(&Connection::received, this, _1);
+    handler.out = boost::bind(&Connector::send, connector, _1);
+    handler.onClose = boost::bind(&Connection::closeChannels, this);
+
+    handler.uid = uid;
+    handler.pwd = pwd;
+    handler.vhost = vhost;
+
     connector->connect(host, port);
-    channels[0] = &channel0;
-    channel0.open(0, *this);
-    channel0.protocolInit(uid, pwd, vhost);
+    connector->init();
+    handler.waitForOpen();
     isOpen = true;
 }
 
@@ -87,14 +98,12 @@
 }
         
 void Connection::close(
-    ReplyCode code, const string& msg, ClassId classId, MethodId methodId
+    ReplyCode /*code*/, const string& /*msg*/, ClassId /*classId*/, MethodId /*methodId*/
 )
 {
     if(markClosed()) {
         try {
-            channel0.sendAndReceive<ConnectionCloseOkBody>(
-                make_shared_ptr(new ConnectionCloseBody(
-                                    getVersion(), code, msg, classId, methodId)));
+            handler.close();
         } catch (const std::exception& e) {
             QPID_LOG(error, "Exception closing channel: " << e.what());
         }
@@ -138,33 +147,14 @@
 void Connection::received(AMQFrame& frame){
     ChannelId id = frame.getChannel();
     Channel* channel = channels[id];
-    if (channel == 0)
-        THROW_QPID_ERROR(
-            PROTOCOL_ERROR+504,
-            (boost::format("Invalid channel number %g") % id).str());
-    try{
-        channel->getHandlers().in->handle(frame);
-    }catch(const qpid::QpidError& e){
-        std::cout << "Caught error while handling " << frame << ": " << e.what() <<std::endl;
-        channelException(
-            *channel, dynamic_cast<AMQMethodBody*>(frame.getBody().get()), e);
+    if (channel == 0) {
+        throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
     }
+    channel->channelHandler.incoming(frame);
 }
 
 void Connection::send(AMQFrame& frame) {
     out->send(frame);
-}
-
-void Connection::channelException(
-    Channel& channel, AMQMethodBody* method, const QpidError& e)
-{
-    int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500;
-    string msg = e.msg;
-    if(method == 0)
-        channel.close(code, msg);
-    else
-        channel.close(
-            code, msg, method->amqpClassId(), method->amqpMethodId());
 }
 
 void Connection::idleIn(){

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp Thu Aug  2 11:09:48 2007
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "CompletionTracker.h"
+
+using qpid::client::CompletionTracker;
+using namespace qpid::framing;
+using namespace boost;
+
+CompletionTracker::CompletionTracker() {}
+CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {}
+
+
+void CompletionTracker::completed(const SequenceNumber& _mark)
+{   
+    sys::Mutex::ScopedLock l(lock);
+    mark = _mark;
+    while (!listeners.empty() && !(listeners.front().first > mark)) {
+        Listener f(listeners.front().second);
+        {
+            sys::Mutex::ScopedUnlock u(lock);
+            f();
+        }
+        listeners.pop();
+    }
+}
+
+void CompletionTracker::listen(const SequenceNumber& point, Listener listener)
+{
+    if (!add(point, listener)) {
+        listener();
+    }
+}
+
+bool CompletionTracker::add(const SequenceNumber& point, Listener listener)
+{
+    sys::Mutex::ScopedLock l(lock);
+    if (point < mark) {
+        return false;
+    } else {
+        listeners.push(make_pair(point, listener));
+        return true;
+    }
+}
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h Thu Aug  2 11:09:48 2007
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <queue>
+#include <boost/function.hpp>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Mutex.h"
+
+#ifndef _CompletionTracker_
+#define _CompletionTracker_
+
+namespace qpid {
+namespace client {
+
+class CompletionTracker
+{
+public:
+    typedef boost::function<void()> Listener;    
+
+    CompletionTracker();
+    CompletionTracker(const framing::SequenceNumber& mark);
+    void completed(const framing::SequenceNumber& mark);
+    void listen(const framing::SequenceNumber& point, Listener l);
+
+private:
+    sys::Mutex lock;
+    framing::SequenceNumber mark;
+    std::queue< std::pair<framing::SequenceNumber, Listener> > listeners;
+
+    bool add(const framing::SequenceNumber& point, Listener l);
+};
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Thu Aug  2 11:09:48 2007
@@ -26,6 +26,7 @@
 #include "qpid/QpidError.h"
 #include "ClientChannel.h"
 #include "Connector.h"
+#include "ConnectionHandler.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/ShutdownHandler.h"
 #include "qpid/sys/TimeoutHandler.h"
@@ -79,17 +80,15 @@
     framing::ProtocolVersion version;
     const uint32_t max_frame_size;
     ChannelMap channels;
+    ConnectionHandler handler;
     Connector defaultConnector;
     Connector* connector;
     framing::OutputHandler* out;
     bool isOpen;
     sys::Mutex shutdownLock;
-    Channel channel0;
     bool debug;
         
     void erase(framing::ChannelId);
-    void channelException(
-        Channel&, framing::AMQMethodBody*, const QpidError&);
     void closeChannels();
     bool markClosed();
 
@@ -174,7 +173,7 @@
     inline uint32_t getMaxFrameSize(){ return max_frame_size; }
 
     /** @return protocol version in use on this connection. */ 
-    framing::ProtocolVersion getVersion() const { return version; }
+    //framing::ProtocolVersion getVersion() const { return version; }
 };
 
 }} // namespace qpid::client

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Thu Aug  2 11:09:48 2007
@@ -0,0 +1,196 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionHandler.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/amqp_framing.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace boost;
+
+namespace {
+const std::string OK("OK");
+}
+
+ConnectionHandler::ConnectionHandler() 
+    : StateManager(NOT_STARTED) 
+{
+
+    mechanism = "PLAIN";
+    locale = "en_US";
+    heartbeat = 0; 
+    maxChannels = 32767; 
+    maxFrameSize = 65536; 
+    insist = true;
+    version = framing::highestProtocolVersion;
+
+    ESTABLISHED.insert(FAILED);
+    ESTABLISHED.insert(OPEN);
+} 
+
+void ConnectionHandler::incoming(AMQFrame& frame)
+{
+    if (getState() == CLOSED) {
+        throw Exception("Connection is closed.");        
+    }
+
+    AMQBody::shared_ptr body = frame.getBody();
+    if (frame.getChannel() == 0) {
+        if (body->type() == METHOD_BODY) {
+            handle(shared_polymorphic_cast<AMQMethodBody>(body));
+        } else {
+            error(503, "Cannot send content on channel zero.");
+        }
+    } else {
+        switch(getState()) {
+        case OPEN:
+            try {
+                in(frame);
+            }catch(ConnectionException& e){
+                error(e.code, e.toString(), body);
+            }catch(std::exception& e){
+                error(541/*internal error*/, e.what(), body);
+            }
+            break;
+        case CLOSING:
+            QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored.");        
+            break;
+        default:
+            //must be in connection initialisation:
+            fail("Cannot receive frames on non-zero channel until connection is established.");
+        }
+    }
+}
+
+void ConnectionHandler::outgoing(AMQFrame& frame)
+{
+    if (getState() == OPEN) {
+        out(frame);
+    } else {
+        throw Exception("Connection is not open.");
+    }
+}
+
+void ConnectionHandler::waitForOpen()
+{
+    waitFor(ESTABLISHED);
+    if (getState() == FAILED) {
+        throw Exception("Failed to establish connection.");
+    }
+}
+
+void ConnectionHandler::close()
+{
+    setState(CLOSING);
+    send(make_shared_ptr(new ConnectionCloseBody(version, 200, OK, 0, 0)));
+
+    waitFor(CLOSED);
+}
+
+void ConnectionHandler::send(framing::AMQBody::shared_ptr body)
+{
+    AMQFrame f;
+    f.setBody(body);
+    out(f);
+}
+
+void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
+{
+    setState(CLOSING);
+    send(make_shared_ptr(new ConnectionCloseBody(version, code, message, classId, methodId)));    
+}
+
+void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody::shared_ptr body)
+{
+    if (body->type() == METHOD_BODY) {
+        AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body));
+        error(code, message, method->amqpClassId(), method->amqpMethodId());
+    } else {
+        error(code, message);
+    }
+}
+
+
+void ConnectionHandler::fail(const std::string& message)
+{
+    QPID_LOG(error, message);
+    setState(FAILED);
+}
+
+void ConnectionHandler::handle(AMQMethodBody::shared_ptr method)
+{
+    switch (getState()) {
+    case NOT_STARTED:
+        if (method->isA<ConnectionStartBody>()) {
+            setState(NEGOTIATING);
+            string response = ((char)0) + uid + ((char)0) + pwd;
+            send(make_shared_ptr(new ConnectionStartOkBody(version, properties, mechanism, response, locale)));
+        } else {
+            fail("Bad method sequence, expected connection-start.");
+        }
+        break;
+    case NEGOTIATING:
+        if (method->isA<ConnectionTuneBody>()) {
+            ConnectionTuneBody::shared_ptr proposal(shared_polymorphic_cast<ConnectionTuneBody>(method));
+            heartbeat = proposal->getHeartbeat();
+            maxChannels = proposal->getChannelMax();    
+            send(make_shared_ptr(new ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat)));
+            setState(OPENING);
+            send(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, insist)));
+        //TODO: support for further security challenges
+        //} else if (method->isA<ConnectionSecureBody>()) {
+        } else {
+            fail("Unexpected method sequence, expected connection-tune.");
+        }
+        break;
+    case OPENING:
+        if (method->isA<ConnectionOpenOkBody>()) {
+            setState(OPEN);
+        //TODO: support for redirection    
+        //} else if (method->isA<ConnectionRedirectBody>()) {
+        } else {
+            fail("Unexpected method sequence, expected connection-open-ok.");
+        }
+        break;
+    case OPEN:
+        if (method->isA<ConnectionCloseBody>()) {
+            send(make_shared_ptr(new ConnectionCloseOkBody(version)));
+            setState(CLOSED);
+            if (onClose) {
+                onClose();
+            }
+        } else {
+            error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId());
+        }
+        break;
+    case CLOSING:
+        if (method->isA<ConnectionCloseOkBody>()) {
+            setState(CLOSED);
+            if (onClose) {
+                onClose();
+            }
+        } else {
+            QPID_LOG(warning, "Received frame on channel zero while closing connection; frame ignored.");        
+        }
+        break;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h Thu Aug  2 11:09:48 2007
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConnectionHandler_
+#define _ConnectionHandler_
+
+#include "Connector.h"
+#include "StateManager.h"
+#include "ChainableFrameHandler.h"
+#include "qpid/framing/InputHandler.h"
+
+namespace qpid {
+namespace client {
+
+struct ConnectionProperties
+{
+    std::string uid;
+    std::string pwd;
+    std::string vhost;
+    framing::FieldTable properties;
+    std::string mechanism;
+    std::string locale;
+    std::string capabilities;
+    uint16_t heartbeat;
+    uint16_t maxChannels;
+    uint64_t maxFrameSize;
+    bool insist;
+    framing::ProtocolVersion version;
+};
+
+class ConnectionHandler : private StateManager, 
+     public ConnectionProperties, 
+     public ChainableFrameHandler, 
+     public framing::InputHandler
+{
+    enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
+    std::set<int> ESTABLISHED;
+    
+    void handle(framing::AMQMethodBody::shared_ptr method);
+    void send(framing::AMQBody::shared_ptr body);
+    void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0);
+    void error(uint16_t code, const std::string& message, framing::AMQBody::shared_ptr body);
+    void fail(const std::string& message);
+
+public:
+    typedef boost::function<void()> CloseListener;    
+
+    ConnectionHandler();
+
+    void received(framing::AMQFrame& f) { incoming(f); } 
+
+    void incoming(framing::AMQFrame& frame);
+    void outgoing(framing::AMQFrame& frame);
+
+    void waitForOpen();
+    void close();
+
+    CloseListener onClose;
+};
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp Thu Aug  2 11:09:48 2007
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Correlator.h"
+
+using qpid::client::Correlator;
+using namespace qpid::framing;
+using namespace boost;
+
+void Correlator::receive(AMQMethodBody::shared_ptr response)
+{
+    if (listeners.empty()) {
+        throw ConnectionException(503, "Unexpected method!");//TODO: include the method & class name
+    } else {
+        Listener l = listeners.front();
+        if (l) l(response);
+        listeners.pop();
+    }
+}
+
+void Correlator::listen(Listener l)
+{
+    listeners.push(l);
+}
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h Thu Aug  2 11:09:48 2007
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <queue>
+#include <set>
+#include <boost/function.hpp>
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/sys/Monitor.h"
+
+#ifndef _Correlator_
+#define _Correlator_
+
+namespace qpid {
+namespace client {
+
+
+class Correlator
+{
+public:
+    typedef boost::function<void(framing::AMQMethodBody::shared_ptr)> Listener;
+
+    void receive(framing::AMQMethodBody::shared_ptr);
+    void listen(Listener l);
+
+private:
+    std::queue<Listener> listeners;
+};
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Thu Aug  2 11:09:48 2007
@@ -0,0 +1,159 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ExecutionHandler.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/BasicDeliverBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace boost;
+
+bool isMessageMethod(AMQMethodBody::shared_ptr method)
+{
+    return method->isA<BasicDeliverBody>() || method->isA<MessageTransferBody>() || method->isA<BasicGetOkBody>();
+}
+
+bool isMessageMethod(AMQBody::shared_ptr body)
+{
+    return body->type() == METHOD_BODY && isMessageMethod(shared_polymorphic_cast<AMQMethodBody>(body));
+}
+
+bool isContentFrame(AMQFrame& frame)
+{
+    AMQBody::shared_ptr body = frame.getBody();
+    uint8_t type = body->type();
+    return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); 
+}
+
+bool invoke(AMQBody::shared_ptr body, Invocable* target)
+{
+    return body->type() == METHOD_BODY && shared_polymorphic_cast<AMQMethodBody>(body)->invoke(target);
+}
+
+ExecutionHandler::ExecutionHandler() : version(framing::highestProtocolVersion) {}
+
+//incoming:
+void ExecutionHandler::handle(AMQFrame& frame)
+{
+    AMQBody::shared_ptr body = frame.getBody();
+    if (!invoke(body, this)) {
+        if (isContentFrame(frame)) {
+            if (!arriving) {
+                arriving = ReceivedContent::shared_ptr(new ReceivedContent(++incoming.hwm));
+            }
+            arriving->append(body);
+            if (arriving->isComplete()) {
+                received.push(arriving);
+                arriving.reset();
+            }
+        } else {
+            ++incoming.hwm;    
+            correlation.receive(shared_polymorphic_cast<AMQMethodBody>(body));
+        }        
+    }
+}
+
+void ExecutionHandler::complete(uint32_t cumulative, SequenceNumberSet range)
+{
+    SequenceNumber mark(cumulative);
+    if (outgoing.lwm < mark) {
+        outgoing.lwm = mark;
+        completion.completed(outgoing.lwm);
+    }
+    if (range.size() % 2) { //must be even number        
+        throw ConnectionException(530, "Received odd number of elements in ranged mark");
+    } else {
+        //TODO: need to manage (record and accumulate) ranges such
+        //that we can implictly move the mark when appropriate
+
+        //TODO: signal listeners of early notification?         
+    }
+}
+
+void ExecutionHandler::flush()
+{
+    //send completion
+    incoming.lwm = incoming.hwm;
+    //make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())));
+}
+
+void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::Listener f, Correlator::Listener g)
+{
+    //allocate id:
+    ++outgoing.hwm;
+    //register listeners if necessary:
+    if (f) {
+        completion.listen(outgoing.hwm, f);
+    }
+    if (g) {
+        correlation.listen(g);
+    }
+
+    AMQFrame frame(version, 0/*id will be filled in be channel handler*/, command);
+    out(frame);
+
+    if (f) {
+        AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version)));
+        out(frame);        
+    }
+}
+
+void ExecutionHandler::sendContent(framing::AMQBody::shared_ptr content)
+{
+    AMQFrame frame(version, 0/*id will be filled in be channel handler*/, content);
+    out(frame);
+}
+
+void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeaderProperties& headers, const std::string& data, 
+                                   uint64_t frameSize,
+                                   CompletionTracker::Listener f, Correlator::Listener g)
+{
+    send(command, f, g);
+
+    AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+    BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header->getProperties()), headers);
+    header->setContentSize(data.size());
+    AMQFrame h(version, 0, header);
+    out(h);
+
+    u_int64_t data_length = data.length();
+    if(data_length > 0){
+        //frame itself uses 8 bytes
+        u_int32_t frag_size = frameSize - 8;
+        if(data_length < frag_size){
+            AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(data)));
+            out(frame);
+        }else{
+            u_int32_t offset = 0;
+            u_int32_t remaining = data_length - offset;
+            while (remaining > 0) {
+                u_int32_t length = remaining > frag_size ? frag_size : remaining;
+                string frag(data.substr(offset, length));
+                AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(frag)));
+                out(frame);
+                offset += length;
+                remaining = data_length - offset;
+            }
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Thu Aug  2 11:09:48 2007
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ExecutionHandler_
+#define _ExecutionHandler_
+
+#include <queue>
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "BlockingQueue.h"
+#include "ChainableFrameHandler.h"
+#include "CompletionTracker.h"
+#include "Correlator.h"
+#include "ReceivedContent.h"
+
+namespace qpid {
+namespace client {
+
+class ExecutionHandler : 
+    private framing::AMQP_ServerOperations::ExecutionHandler,
+    public ChainableFrameHandler
+{
+    framing::Window incoming;
+    framing::Window outgoing;
+    ReceivedContent::shared_ptr arriving;
+    Correlator correlation;
+    CompletionTracker completion;
+    framing::ProtocolVersion version;
+
+    void complete(uint32_t mark, framing::SequenceNumberSet range);    
+    void flush();
+
+public:
+    BlockingQueue<ReceivedContent::shared_ptr> received; 
+
+    ExecutionHandler();
+
+    void handle(framing::AMQFrame& frame);
+    void send(framing::AMQBody::shared_ptr command, 
+              CompletionTracker::Listener f = CompletionTracker::Listener(), 
+              Correlator::Listener g = Correlator::Listener());
+    void sendContent(framing::AMQBody::shared_ptr command, 
+                     const framing::BasicHeaderProperties& headers, const std::string& data, 
+                     uint64_t frameSize,
+                     CompletionTracker::Listener f = CompletionTracker::Listener(), 
+                     Correlator::Listener g = Correlator::Listener());
+
+    void sendContent(framing::AMQBody::shared_ptr content);
+};
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp Thu Aug  2 11:09:48 2007
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "FutureCompletion.h"
+
+using namespace qpid::client;
+using namespace qpid::sys;
+
+FutureCompletion::FutureCompletion() : complete(false), closed(false), code(0) {}
+
+bool FutureCompletion::isComplete()
+{
+    Monitor::ScopedLock l(lock);
+    return complete;
+}
+
+void FutureCompletion::completed()
+{
+    Monitor::ScopedLock l(lock);
+    complete = true;
+    lock.notifyAll();
+}
+
+void FutureCompletion::waitForCompletion()
+{
+    Monitor::ScopedLock l(lock);
+    while (!complete && !closed) {
+        lock.wait();
+    }
+    if (closed) {
+        throw ChannelException(code, text);
+    }
+}
+
+void FutureCompletion::close(uint16_t _code, const std::string& _text)
+{
+    Monitor::ScopedLock l(lock);
+    complete = true;
+    closed = true;
+    code = _code;
+    text = _text;        
+    lock.notifyAll();
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h Thu Aug  2 11:09:48 2007
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _FutureCompletion_
+#define _FutureCompletion_
+
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/sys/Monitor.h"
+
+namespace qpid {
+namespace client {
+
+class FutureCompletion 
+{
+protected:
+    sys::Monitor lock;
+    bool complete;
+    bool closed;
+    uint16_t code;
+    std::string text;
+
+public:
+    FutureCompletion();
+    virtual ~FutureCompletion(){}
+    bool isComplete();
+    void waitForCompletion();
+    void completed();
+    void close(uint16_t code, const std::string& text);
+};
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.cpp Thu Aug  2 11:09:48 2007
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "FutureFactory.h"
+
+using namespace qpid::client;
+using namespace boost;
+
+shared_ptr<FutureCompletion> FutureFactory::createCompletion()
+{
+    shared_ptr<FutureCompletion> f(new FutureCompletion());
+    weak_ptr<FutureCompletion> w(f);
+    set.push_back(w);
+    return f;
+}
+
+shared_ptr<FutureResponse> FutureFactory::createResponse()
+{
+    shared_ptr<FutureResponse> f(new FutureResponse());
+    weak_ptr<FutureCompletion> w(static_pointer_cast<FutureCompletion>(f));
+    set.push_back(w);
+    return f;
+}
+
+void FutureFactory::close(uint16_t code, const std::string& text)
+{
+    for (WeakPtrSet::iterator i = set.begin(); i != set.end(); i++) {
+        shared_ptr<FutureCompletion> p = i->lock();
+        if (p) {
+            p->close(code, text);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.h Thu Aug  2 11:09:48 2007
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _FutureFactory_
+#define _FutureFactory_
+
+#include <vector>
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
+#include "FutureCompletion.h"
+#include "FutureResponse.h"
+
+namespace qpid {
+namespace client {
+
+class FutureFactory 
+{
+    typedef std::vector< boost::weak_ptr<FutureCompletion> > WeakPtrSet;
+    WeakPtrSet set;
+
+public:
+    boost::shared_ptr<FutureCompletion> createCompletion();
+    boost::shared_ptr<FutureResponse> createResponse();
+    void close(uint16_t code, const std::string& text);
+};
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date