You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2006/12/01 06:11:53 UTC

svn commit: r481159 [7/12] - in /incubator/qpid/trunk/qpid/cpp: ./ build-aux/ gen/ lib/ lib/broker/ lib/client/ lib/common/ lib/common/framing/ lib/common/sys/ lib/common/sys/apr/ lib/common/sys/posix/ m4/ src/ src/qpid/ src/qpid/apr/ src/qpid/broker/ ...

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <TxAck.h>
+
+using std::bind1st;
+using std::bind2nd;
+using std::mem_fun_ref;
+using namespace qpid::broker;
+
+TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : acked(_acked), unacked(_unacked){
+
+}
+
+bool TxAck::prepare(TransactionContext* ctxt) throw(){
+    try{
+        //dequeue all acked messages from their queues
+        for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
+            if (i->coveredBy(&acked)) {
+                i->discard(ctxt);
+            }
+        }
+        //for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::discardIfCoveredBy), &acked));
+        return true;
+    }catch(...){
+        std::cout << "TxAck::prepare() - Failed to prepare" << std::endl;
+        return false;
+    }
+}
+
+void TxAck::commit() throw(){
+    //remove all acked records from the list
+    unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked));
+}
+
+void TxAck::rollback() throw(){
+}

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TxAck_
+#define _TxAck_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include <AccumulatedAck.h>
+#include <DeliveryRecord.h>
+#include <TxOp.h>
+
+namespace qpid {
+    namespace broker {
+        /**
+         * Defines the transactional behaviour for acks received by a
+         * transactional channel.
+         */
+        class TxAck : public TxOp{
+            AccumulatedAck& acked;
+            std::list<DeliveryRecord>& unacked;
+        public:
+            /**
+             * @param acked a representation of the accumulation of
+             * acks received
+             * @param unacked the record of delivered messages
+             */
+            TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+            virtual bool prepare(TransactionContext* ctxt) throw();
+            virtual void commit() throw();
+            virtual void rollback() throw();
+            virtual ~TxAck(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <TxBuffer.h>
+
+using std::mem_fun;
+using namespace qpid::broker;
+
+bool TxBuffer::prepare(TransactionalStore* const store)
+{
+    std::auto_ptr<TransactionContext> ctxt;
+    if(store) ctxt = store->begin();
+    for(op_iterator i = ops.begin(); i < ops.end(); i++){
+        if(!(*i)->prepare(ctxt.get())){
+            if(store) store->abort(ctxt.get());
+            return false;
+        }
+    }
+    if(store) store->commit(ctxt.get());
+    return true;
+}
+
+void TxBuffer::commit()
+{
+    for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit));
+    ops.clear();
+}
+
+void TxBuffer::rollback()
+{
+    for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback));
+    ops.clear();
+}
+
+void TxBuffer::enlist(TxOp* const op)
+{
+    ops.push_back(op);
+}

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TxBuffer_
+#define _TxBuffer_
+
+#include <algorithm>
+#include <functional>
+#include <vector>
+#include <TransactionalStore.h>
+#include <TxOp.h>
+
+/**
+ * Represents a single transaction. As such, an instance of this class
+ * will hold a list of operations representing the workload of the
+ * transaction. This work can be committed or rolled back. Committing
+ * is a two-stage process: first all the operations should be
+ * prepared, then if that succeeds they can be committed.
+ * 
+ * In the 2pc case, a successful prepare may be followed by either a
+ * commit or a rollback.
+ * 
+ * Atomicity of prepare is ensured by using a lower level
+ * transactional facility. This saves explicitly rolling back all the
+ * successfully prepared ops when one of them fails. i.e. we do not
+ * use 2pc internally, we instead ensure that prepare is atomic at a
+ * lower level. This makes individual prepare operations easier to
+ * code.
+ * 
+ * Transactions on a messaging broker effect three types of 'action':
+ * (1) updates to persistent storage (2) updates to transient storage
+ * or cached data (3) network writes.
+ * 
+ * Of these, (1) should always occur atomically during prepare to
+ * ensure that if the broker crashes while a transaction is being
+ * completed the persistent state (which is all that then remains) is
+ * consistent. (3) can only be done on commit, after a successful
+ * prepare. There is a little more flexibility with (2) but any
+ * changes made during prepare should be subject to the control of the
+ * TransactionalStore in use.
+ */
+namespace qpid {
+    namespace broker {
+        class TxBuffer{
+            typedef std::vector<TxOp*>::iterator op_iterator;
+            std::vector<TxOp*> ops;
+        public:
+            /**
+             * Requests that all ops are prepared. This should
+             * primarily involve making sure that a persistent record
+             * of the operations is stored where necessary.
+             * 
+             * All ops will be prepared under a transaction on the
+             * specified store. If any operation fails on prepare,
+             * this transaction will be rolled back. 
+             * 
+             * Once prepared, a transaction can be committed (or in
+             * the 2pc case, rolled back).
+             * 
+             * @returns true if all the operations prepared
+             * successfully, false if not.
+             */
+            bool prepare(TransactionalStore* const store);
+            /**
+             * Signals that the ops all prepared all completed
+             * successfully and can now commit, i.e. the operation can
+             * now be fully carried out.
+             * 
+             * Should only be called after a call to prepare() returns
+             * true.
+             */
+            void commit();
+            /**
+             * Rolls back all the operations.
+             * 
+             * Should only be called either after a call to prepare()
+             * returns true (2pc) or instead of a prepare call
+             * ('server-local')
+             */
+            void rollback();
+            /**
+             * Adds an operation to the transaction.
+             */
+            void enlist(TxOp* const op);
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/TxOp.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxOp.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxOp.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxOp.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TxOp_
+#define _TxOp_
+
+#include <TransactionalStore.h>
+
+namespace qpid {
+    namespace broker {
+        class TxOp{
+        public:
+            virtual bool prepare(TransactionContext*) throw() = 0;
+            virtual void commit()  throw() = 0;
+            virtual void rollback()  throw() = 0;
+            virtual ~TxOp(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TxOp.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TxOp.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <TxPublish.h>
+
+using namespace qpid::broker;
+
+TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {}
+
+bool TxPublish::prepare(TransactionContext* ctxt) throw(){
+    try{
+        for_each(queues.begin(), queues.end(), Prepare(ctxt, msg, 0));
+        return true;
+    }catch(...){
+        std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl;
+        return false;
+    }
+}
+
+void TxPublish::commit() throw(){
+    for_each(queues.begin(), queues.end(), Commit(msg));
+}
+
+void TxPublish::rollback() throw(){
+}
+
+void TxPublish::deliverTo(Queue::shared_ptr& queue){
+    queues.push_back(queue);
+}
+
+TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg, const string* const _xid) 
+    : ctxt(_ctxt), msg(_msg), xid(_xid){}
+
+void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
+    queue->enqueue(ctxt, msg, xid);
+}
+
+TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}
+
+void TxPublish::Commit::operator()(Queue::shared_ptr& queue){
+    queue->process(msg);
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TxPublish_
+#define _TxPublish_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include <Deliverable.h>
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <BrokerQueue.h>
+#include <TxOp.h>
+
+namespace qpid {
+    namespace broker {
+        /**
+         * Defines the behaviour for publish operations on a
+         * transactional channel. Messages are routed through
+         * exchanges when received but are not at that stage delivered
+         * to the matching queues, rather the queues are held in an
+         * instance of this class. On prepare() the message is marked
+         * enqueued to the relevant queues in the MessagesStore. On
+         * commit() the messages will be passed to the queue for
+         * dispatch or to be added to the in-memory queue.
+         */
+        class TxPublish : public TxOp, public Deliverable{
+            class Prepare{
+                TransactionContext* ctxt;
+                Message::shared_ptr& msg;
+                const std::string* const xid;
+            public:
+                Prepare(TransactionContext* ctxt, Message::shared_ptr& msg, const std::string* const xid);
+                void operator()(Queue::shared_ptr& queue);            
+            };
+
+            class Commit{
+                Message::shared_ptr& msg;
+            public:
+                Commit(Message::shared_ptr& msg);
+                void operator()(Queue::shared_ptr& queue);            
+            };
+
+            Message::shared_ptr msg;
+            std::list<Queue::shared_ptr> queues;
+
+        public:
+            TxPublish(Message::shared_ptr msg);
+            virtual bool prepare(TransactionContext* ctxt) throw();
+            virtual void commit() throw();
+            virtual void rollback() throw();
+
+            virtual void deliverTo(Queue::shared_ptr& queue);
+
+            virtual ~TxPublish(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,428 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ClientChannel.h>
+#include <sys/Monitor.h>
+#include <ClientMessage.h>
+#include <QpidError.h>
+#include <MethodBodyInstances.h>
+
+using namespace boost;          //to use dynamic_pointer_cast
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+Channel::Channel(bool _transactional, u_int16_t _prefetch) :
+    id(0),
+    con(0), 
+    out(0), 
+    incoming(0),
+    closed(true),
+    prefetch(_prefetch), 
+    transactional(_transactional),
+// AMQP version management change - kpvdr 2006-11-20
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+    version(8, 0)
+{ }
+
+Channel::~Channel(){
+    stop();
+}
+
+void Channel::setPrefetch(u_int16_t _prefetch){
+    prefetch = _prefetch;
+    if(con != 0 && out != 0){
+        setQos();
+    }
+}
+
+void Channel::setQos(){
+// AMQP version management change - kpvdr 2006-11-20
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+    sendAndReceive(new AMQFrame(id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok);
+    if(transactional){
+        sendAndReceive(new AMQFrame(id, new TxSelectBody(version)), method_bodies.tx_select_ok);
+    }
+}
+
+void Channel::declareExchange(Exchange& exchange, bool synch){
+    string name = exchange.getName();
+    string type = exchange.getType();
+    FieldTable args;
+    AMQFrame*  frame = new AMQFrame(id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args));
+    if(synch){
+        sendAndReceive(frame, method_bodies.exchange_declare_ok);
+    }else{
+        out->send(frame);
+    }
+}
+
+void Channel::deleteExchange(Exchange& exchange, bool synch){
+    string name = exchange.getName();
+    AMQFrame*  frame = new AMQFrame(id, new ExchangeDeleteBody(version, 0, name, false, !synch));
+    if(synch){
+        sendAndReceive(frame, method_bodies.exchange_delete_ok);
+    }else{
+        out->send(frame);
+    }
+}
+
+void Channel::declareQueue(Queue& queue, bool synch){
+    string name = queue.getName();
+    FieldTable args;
+    AMQFrame*  frame = new AMQFrame(id, new QueueDeclareBody(version, 0, name, false, false, 
+                                                             queue.isExclusive(), 
+                                                             queue.isAutoDelete(), !synch, args));
+    if(synch){
+        sendAndReceive(frame, method_bodies.queue_declare_ok);
+        if(queue.getName().length() == 0){
+            QueueDeclareOkBody::shared_ptr response = 
+                dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse());
+            queue.setName(response->getQueue());
+        }
+    }else{
+        out->send(frame);
+    }
+}
+
+void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
+    //ticket, queue, ifunused, ifempty, nowait
+    string name = queue.getName();
+    AMQFrame*  frame = new AMQFrame(id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
+    if(synch){
+        sendAndReceive(frame, method_bodies.queue_delete_ok);
+    }else{
+        out->send(frame);
+    }
+}
+
+void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
+    string e = exchange.getName();
+    string q = queue.getName();
+    AMQFrame*  frame = new AMQFrame(id, new QueueBindBody(version, 0, q, e, key,!synch, args));
+    if(synch){
+        sendAndReceive(frame, method_bodies.queue_bind_ok);
+    }else{
+        out->send(frame);
+    }
+}
+
+void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener, 
+                      int ackMode, bool noLocal, bool synch){
+
+    string q = queue.getName();
+    AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(version, 0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch));
+    if(synch){
+        sendAndReceive(frame, method_bodies.basic_consume_ok);
+        BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse());
+        tag = response->getConsumerTag();
+    }else{
+        out->send(frame);
+    }
+    Consumer* c = new Consumer();
+    c->listener = listener;
+    c->ackMode = ackMode;
+    c->lastDeliveryTag = 0;
+    consumers[tag] = c;
+}
+
+void Channel::cancel(std::string& tag, bool synch){
+    Consumer* c = consumers[tag];
+    if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
+        out->send(new AMQFrame(id, new BasicAckBody(version, c->lastDeliveryTag, true)));
+    }
+
+    AMQFrame*  frame = new AMQFrame(id, new BasicCancelBody(version, (string&) tag, !synch));
+    if(synch){
+        sendAndReceive(frame, method_bodies.basic_cancel_ok);
+    }else{
+        out->send(frame);
+    }
+    consumers.erase(tag);
+    if(c != 0){
+        delete c;
+    }
+}
+
+void Channel::cancelAll(){
+    for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){
+        Consumer* c = i->second;
+        if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){
+            out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true)));
+        }
+        consumers.erase(i);
+        delete c;
+    }
+}
+
+void Channel::retrieve(Message& msg){
+    Monitor::ScopedLock l(retrievalMonitor);
+    while(retrieved == 0){
+        retrievalMonitor.wait();
+    }
+
+    msg.header = retrieved->getHeader();
+    msg.deliveryTag = retrieved->getDeliveryTag();
+    retrieved->getData(msg.data);
+    delete retrieved;
+    retrieved = 0;
+}
+
+bool Channel::get(Message& msg, const Queue& queue, int ackMode){
+    string name = queue.getName();
+    AMQFrame*  frame = new AMQFrame(id, new BasicGetBody(version, 0, name, ackMode));
+    responses.expect();
+    out->send(frame);
+    responses.waitForResponse();
+    AMQMethodBody::shared_ptr response = responses.getResponse();
+    if(method_bodies.basic_get_ok.match(response.get())){
+        if(incoming != 0){
+            std::cout << "Existing message not complete" << std::endl;
+            THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
+        }else{
+            incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
+        }
+        retrieve(msg);
+        return true;
+    }if(method_bodies.basic_get_empty.match(response.get())){
+        return false;
+    }else{
+        THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get.");
+    }
+}
+
+    
+void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){
+    string e = exchange.getName();
+    string key = routingKey;
+
+    out->send(new AMQFrame(id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
+    //break msg up into header frame and content frame(s) and send these
+    string data = msg.getData();
+    msg.header->setContentSize(data.length());
+    AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header));
+    out->send(new AMQFrame(id, body));
+    
+    u_int64_t data_length = data.length();
+    if(data_length > 0){
+        u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes
+        if(data_length < frag_size){
+            out->send(new AMQFrame(id, new AMQContentBody(data)));
+        }else{
+            u_int32_t offset = 0;
+            u_int32_t remaining = data_length - offset;
+            while (remaining > 0) {
+                u_int32_t length = remaining > frag_size ? frag_size : remaining;
+                string frag(data.substr(offset, length));
+                out->send(new AMQFrame(id, new AMQContentBody(frag)));                          
+                
+                offset += length;
+                remaining = data_length - offset;
+            }
+        }
+    }
+}
+    
+void Channel::commit(){
+    AMQFrame*  frame = new AMQFrame(id, new TxCommitBody(version));
+    sendAndReceive(frame, method_bodies.tx_commit_ok);
+}
+
+void Channel::rollback(){
+    AMQFrame*  frame = new AMQFrame(id, new TxRollbackBody(version));
+    sendAndReceive(frame, method_bodies.tx_rollback_ok);
+}
+    
+void Channel::handleMethod(AMQMethodBody::shared_ptr body){
+    //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
+    if(responses.isWaiting()){
+        responses.signalResponse(body);
+    }else if(method_bodies.basic_deliver.match(body.get())){
+        if(incoming != 0){
+            std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
+            THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
+        }else{
+            incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
+        }
+    }else if(method_bodies.basic_return.match(body.get())){
+        if(incoming != 0){
+            std::cout << "Existing message not complete" << std::endl;
+            THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
+        }else{
+            incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
+        }
+    }else if(method_bodies.channel_close.match(body.get())){
+        con->removeChannel(this);
+        //need to signal application that channel has been closed through exception
+
+    }else if(method_bodies.channel_flow.match(body.get())){
+        
+    }else{
+        //signal error
+        std::cout << "Unhandled method: " << *body << std::endl;
+        THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method");
+    }
+}
+    
+void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
+    if(incoming == 0){
+        //handle invalid frame sequence
+        std::cout << "Invalid message sequence: got header before return or deliver." << std::endl;
+        THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver.");
+    }else{
+        incoming->setHeader(body);
+        if(incoming->isComplete()){ 
+            enqueue();            
+        }
+    }           
+}
+    
+void Channel::handleContent(AMQContentBody::shared_ptr body){
+    if(incoming == 0){
+        //handle invalid frame sequence
+        std::cout << "Invalid message sequence: got content before return or deliver." << std::endl;
+        THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver.");
+    }else{
+        incoming->addContent(body);
+        if(incoming->isComplete()){
+            enqueue();
+        }
+    }           
+}
+    
+void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
+    THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
+}
+
+void Channel::start(){
+    dispatcher = Thread(this);
+}
+
+void Channel::stop(){
+    {
+        Monitor::ScopedLock l(dispatchMonitor);
+        closed = true;
+        dispatchMonitor.notify();
+    }
+    dispatcher.join();        
+}
+
+void Channel::run(){
+    dispatch();
+}
+
+void Channel::enqueue(){
+    if(incoming->isResponse()){
+        Monitor::ScopedLock l(retrievalMonitor);
+        retrieved = incoming;
+        retrievalMonitor.notify();
+    }else{
+        Monitor::ScopedLock l(dispatchMonitor);
+        messages.push(incoming);
+        dispatchMonitor.notify();
+    }
+    incoming = 0;
+}
+
+IncomingMessage* Channel::dequeue(){
+    Monitor::ScopedLock l(dispatchMonitor);
+    while(messages.empty() && !closed){
+        dispatchMonitor.wait();
+    }    
+    IncomingMessage* msg = 0;
+    if(!messages.empty()){
+        msg = messages.front();
+        messages.pop();
+    }
+    return msg; 
+}
+
+void Channel::deliver(Consumer* consumer, Message& msg){
+    //record delivery tag:
+    consumer->lastDeliveryTag = msg.getDeliveryTag();
+
+    //allow registered listener to handle the message
+    consumer->listener->received(msg);
+
+    //if the handler calls close on the channel or connection while
+    //handling this message, then consumer will now have been deleted.
+    if(!closed){
+        bool multiple(false);
+        switch(consumer->ackMode){
+        case LAZY_ACK: 
+            multiple = true;
+            if(++(consumer->count) < prefetch) break;
+            //else drop-through
+        case AUTO_ACK:
+            out->send(new AMQFrame(id, new BasicAckBody(msg.getDeliveryTag(), multiple)));
+            consumer->lastDeliveryTag = 0;
+        }
+    }
+
+    //as it stands, transactionality is entirely orthogonal to ack
+    //mode, though the acks will not be processed by the broker under
+    //a transaction until it commits.
+}
+
+void Channel::dispatch(){
+    while(!closed){
+        IncomingMessage* incomingMsg = dequeue();
+        if(incomingMsg){
+            //Note: msg is currently only valid for duration of this call
+            Message msg(incomingMsg->getHeader());
+            incomingMsg->getData(msg.data);
+            if(incomingMsg->isReturn()){
+                if(returnsHandler == 0){
+                    //print warning to log/console
+                    std::cout << "Message returned: " << msg.getData() << std::endl;
+                }else{
+                    returnsHandler->returned(msg);
+                }
+            }else{
+                msg.deliveryTag = incomingMsg->getDeliveryTag();
+                std::string tag = incomingMsg->getConsumerTag();
+                
+                if(consumers[tag] == 0){
+                    //signal error
+                    std::cout << "Unknown consumer: " << tag << std::endl;
+                }else{
+                    deliver(consumers[tag], msg);
+                }
+            }
+            delete incomingMsg;
+        }
+    }
+}
+
+void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+    returnsHandler = handler;
+}
+
+void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
+    responses.expect();
+    out->send(frame);
+    responses.receive(body);
+}
+
+void Channel::close(){
+    if(con != 0){
+        con->closeChannel(this);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <map>
+#include <string>
+#include <queue>
+#include "sys/types.h"
+
+#ifndef _Channel_
+#define _Channel_
+
+#include <framing/amqp_framing.h>
+#include <Connection.h>
+#include <ClientExchange.h>
+#include <IncomingMessage.h>
+#include <ClientMessage.h>
+#include <MessageListener.h>
+#include <ClientQueue.h>
+#include <ResponseHandler.h>
+#include <ReturnedMessageHandler.h>
+
+namespace qpid {
+namespace client {
+    enum ack_modes {NO_ACK=0, AUTO_ACK=1, LAZY_ACK=2, CLIENT_ACK=3};
+
+    class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::sys::Runnable{
+        struct Consumer{
+            MessageListener* listener;
+            int ackMode;
+            int count;
+            u_int64_t lastDeliveryTag;
+        };
+        typedef std::map<std::string,Consumer*>::iterator consumer_iterator; 
+
+	u_int16_t id;
+	Connection* con;
+	qpid::sys::Thread dispatcher;
+	qpid::framing::OutputHandler* out;
+	IncomingMessage* incoming;
+	ResponseHandler responses;
+	std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
+	IncomingMessage* retrieved;//holds response to basic.get
+	qpid::sys::Monitor dispatchMonitor;
+	qpid::sys::Monitor retrievalMonitor;
+	std::map<std::string, Consumer*> consumers;
+	ReturnedMessageHandler* returnsHandler;
+	bool closed;
+
+        u_int16_t prefetch;
+        const bool transactional;
+    qpid::framing::ProtocolVersion version;
+
+	void enqueue();
+	void retrieve(Message& msg);
+	IncomingMessage* dequeue();
+	void dispatch();
+	void stop();
+	void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);            
+        void deliver(Consumer* consumer, Message& msg);
+        void setQos();
+	void cancelAll();
+
+	virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
+	virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
+	virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
+	virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+    public:
+	Channel(bool transactional = false, u_int16_t prefetch = 500);
+	~Channel();
+
+	void declareExchange(Exchange& exchange, bool synch = true);
+	void deleteExchange(Exchange& exchange, bool synch = true);
+	void declareQueue(Queue& queue, bool synch = true);
+	void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
+	void bind(const Exchange& exchange, const Queue& queue, const std::string& key, 
+                  const qpid::framing::FieldTable& args, bool synch = true);
+        void consume(Queue& queue, std::string& tag, MessageListener* listener, 
+                     int ackMode = NO_ACK, bool noLocal = false, bool synch = true);
+	void cancel(std::string& tag, bool synch = true);
+        bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
+        void publish(Message& msg, const Exchange& exchange, const std::string& routingKey, 
+                     bool mandatory = false, bool immediate = false);
+
+        void commit();
+        void rollback();
+
+        void setPrefetch(u_int16_t prefetch);
+
+	/**
+	 * Start message dispatching on a new thread
+	 */
+	void start();
+	/**
+	 * Do message dispatching on this thread
+	 */
+	void run();
+
+        void close();
+
+	void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+        friend class Connection;
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/ClientExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/ClientExchange.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/ClientExchange.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/ClientExchange.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ClientExchange.h>
+
+qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){}
+const std::string& qpid::client::Exchange::getName() const { return name; }
+const std::string& qpid::client::Exchange::getType() const { return type; }
+
+const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct";
+const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic";
+const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers";
+
+const qpid::client::Exchange qpid::client::Exchange::DEFAULT_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE);
+const qpid::client::Exchange qpid::client::Exchange::DEFAULT_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE);
+const qpid::client::Exchange qpid::client::Exchange::DEFAULT_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE);

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientExchange.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientExchange.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/ClientExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/ClientExchange.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/ClientExchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/ClientExchange.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+#ifndef _Exchange_
+#define _Exchange_
+
+namespace qpid {
+namespace client {
+
+    class Exchange{
+	const std::string name;
+	const std::string type;
+
+    public:
+
+	static const std::string DIRECT_EXCHANGE;
+	static const std::string TOPIC_EXCHANGE;
+	static const std::string HEADERS_EXCHANGE;
+
+	static const Exchange DEFAULT_DIRECT_EXCHANGE;
+	static const Exchange DEFAULT_TOPIC_EXCHANGE;
+	static const Exchange DEFAULT_HEADERS_EXCHANGE;
+
+	Exchange(std::string name, std::string type = DIRECT_EXCHANGE);
+	const std::string& getName() const;
+	const std::string& getType() const;
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientExchange.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientExchange.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ClientMessage.h>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+Message::Message(){
+    header = AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC));
+}
+
+Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){
+}
+
+Message::~Message(){
+}
+	
+BasicHeaderProperties* Message::getHeaderProperties(){
+    return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+}
+
+const std::string& Message::getContentType(){ 
+    return getHeaderProperties()->getContentType(); 
+}
+
+const std::string& Message::getContentEncoding(){ 
+    return getHeaderProperties()->getContentEncoding(); 
+}
+
+FieldTable& Message::getHeaders(){ 
+    return getHeaderProperties()->getHeaders(); 
+}
+
+u_int8_t Message::getDeliveryMode(){ 
+    return getHeaderProperties()->getDeliveryMode(); 
+}
+
+u_int8_t Message::getPriority(){ 
+    return getHeaderProperties()->getPriority(); 
+}
+
+const std::string& Message::getCorrelationId(){
+    return getHeaderProperties()->getCorrelationId(); 
+}
+
+const std::string& Message::getReplyTo(){ 
+    return getHeaderProperties()->getReplyTo(); 
+}
+
+const std::string& Message::getExpiration(){ 
+    return getHeaderProperties()->getExpiration(); 
+}
+
+const std::string& Message::getMessageId(){
+    return getHeaderProperties()->getMessageId(); 
+}
+
+u_int64_t Message::getTimestamp(){ 
+    return getHeaderProperties()->getTimestamp(); 
+}
+
+const std::string& Message::getType(){ 
+    return getHeaderProperties()->getType(); 
+}
+
+const std::string& Message::getUserId(){ 
+    return getHeaderProperties()->getUserId(); 
+}
+
+const std::string& Message::getAppId(){ 
+    return getHeaderProperties()->getAppId(); 
+}
+
+const std::string& Message::getClusterId(){ 
+    return getHeaderProperties()->getClusterId(); 
+}
+
+void Message::setContentType(const std::string& type){ 
+    getHeaderProperties()->setContentType(type); 
+}
+
+void Message::setContentEncoding(const std::string& encoding){ 
+    getHeaderProperties()->setContentEncoding(encoding); 
+}
+
+void Message::setHeaders(const FieldTable& headers){ 
+    getHeaderProperties()->setHeaders(headers); 
+}
+
+void Message::setDeliveryMode(u_int8_t mode){ 
+    getHeaderProperties()->setDeliveryMode(mode); 
+}
+
+void Message::setPriority(u_int8_t priority){ 
+    getHeaderProperties()->setPriority(priority); 
+}
+
+void Message::setCorrelationId(const std::string& correlationId){ 
+    getHeaderProperties()->setCorrelationId(correlationId); 
+}
+
+void Message::setReplyTo(const std::string& replyTo){ 
+    getHeaderProperties()->setReplyTo(replyTo);
+}
+
+void Message::setExpiration(const std::string&  expiration){ 
+    getHeaderProperties()->setExpiration(expiration); 
+}
+
+void Message::setMessageId(const std::string& messageId){ 
+    getHeaderProperties()->setMessageId(messageId); 
+}
+
+void Message::setTimestamp(u_int64_t timestamp){ 
+    getHeaderProperties()->setTimestamp(timestamp); 
+}
+
+void Message::setType(const std::string& type){ 
+    getHeaderProperties()->setType(type); 
+}
+
+void Message::setUserId(const std::string& userId){ 
+    getHeaderProperties()->setUserId(userId); 
+}
+
+void Message::setAppId(const std::string& appId){
+    getHeaderProperties()->setAppId(appId); 
+}
+
+void Message::setClusterId(const std::string& clusterId){ 
+    getHeaderProperties()->setClusterId(clusterId); 
+}

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <framing/amqp_framing.h>
+
+#ifndef _Message_
+#define _Message_
+
+
+namespace qpid {
+namespace client {
+
+    class Message{
+	qpid::framing::AMQHeaderBody::shared_ptr header;
+        std::string data;
+	bool redelivered;
+        u_int64_t deliveryTag;
+
+        qpid::framing::BasicHeaderProperties* getHeaderProperties();
+	Message(qpid::framing::AMQHeaderBody::shared_ptr& header);
+    public:
+	Message();
+	~Message();
+	
+	inline std::string getData(){ return data; }
+	inline void setData(const std::string& _data){ data = _data; }
+
+	inline bool isRedelivered(){ return redelivered; }
+	inline void setRedelivered(bool _redelivered){  redelivered = _redelivered; }
+
+        inline u_int64_t getDeliveryTag(){ return deliveryTag; }
+
+        const std::string& getContentType();
+        const std::string& getContentEncoding();
+        qpid::framing::FieldTable& getHeaders();
+        u_int8_t getDeliveryMode();
+        u_int8_t getPriority();
+        const std::string& getCorrelationId();
+        const std::string& getReplyTo();
+        const std::string& getExpiration();
+        const std::string& getMessageId();
+        u_int64_t getTimestamp();
+        const std::string& getType();
+        const std::string& getUserId();
+        const std::string& getAppId();
+        const std::string& getClusterId();
+
+	void setContentType(const std::string& type);
+	void setContentEncoding(const std::string& encoding);
+	void setHeaders(const qpid::framing::FieldTable& headers);
+	void setDeliveryMode(u_int8_t mode);
+	void setPriority(u_int8_t priority);
+	void setCorrelationId(const std::string& correlationId);
+	void setReplyTo(const std::string& replyTo);
+	void setExpiration(const std::string&  expiration);
+	void setMessageId(const std::string& messageId);
+	void setTimestamp(u_int64_t timestamp);
+	void setType(const std::string& type);
+	void setUserId(const std::string& userId);
+	void setAppId(const std::string& appId);
+	void setClusterId(const std::string& clusterId);
+
+
+	friend class Channel;
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/ClientQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/ClientQueue.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/ClientQueue.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/ClientQueue.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ClientQueue.h>
+
+qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true){}
+
+qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false){}
+
+qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp){}
+
+qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive) 
+  : name(_name), autodelete(_autodelete), exclusive(_exclusive){}
+
+const std::string& qpid::client::Queue::getName() const{
+    return name;
+}
+
+void qpid::client::Queue::setName(const std::string& _name){
+    name = _name;
+}
+
+bool qpid::client::Queue::isAutoDelete() const{
+    return autodelete;
+}
+
+bool qpid::client::Queue::isExclusive() const{
+    return exclusive;
+}
+
+
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientQueue.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientQueue.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/ClientQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/ClientQueue.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/ClientQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/ClientQueue.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+#ifndef _Queue_
+#define _Queue_
+
+namespace qpid {
+namespace client {
+
+    class Queue{
+	std::string name;
+        const bool autodelete;
+        const bool exclusive;
+
+    public:
+
+	Queue();
+	Queue(std::string name);
+	Queue(std::string name, bool temp);
+	Queue(std::string name, bool autodelete, bool exclusive);
+	const std::string& getName() const;
+	void setName(const std::string&);
+        bool isAutoDelete() const;
+        bool isExclusive() const;
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientQueue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/ClientQueue.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/Connection.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/Connection.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/Connection.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,244 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <Connection.h>
+#include <ClientChannel.h>
+#include <ClientMessage.h>
+#include <QpidError.h>
+#include <iostream>
+#include <MethodBodyInstances.h>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+using namespace qpid::sys;
+
+u_int16_t Connection::channelIdCounter;
+
+Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true),
+// AMQP version management change - kpvdr 2006-11-20
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+    version(8, 0)
+{
+    connector = new Connector(debug, _max_frame_size);
+}
+
+Connection::~Connection(){
+    delete connector;
+}
+
+void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){
+    host = _host;
+    port = _port;
+    connector->setInputHandler(this);
+    connector->setTimeoutHandler(this);
+    connector->setShutdownHandler(this);
+    out = connector->getOutputHandler();
+    connector->connect(host, port);
+    
+    ProtocolInitiation* header = new ProtocolInitiation(8, 0);
+    responses.expect();
+    connector->init(header);
+    responses.receive(method_bodies.connection_start);
+
+    FieldTable props;
+    string mechanism("PLAIN");
+    string response = ((char)0) + uid + ((char)0) + pwd;
+    string locale("en_US");
+    responses.expect();
+    out->send(new AMQFrame(0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));
+
+    /**
+     * Assume for now that further challenges will not be required
+    //receive connection.secure
+    responses.receive(connection_secure));
+    //send connection.secure-ok
+    out->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
+    **/
+
+    responses.receive(method_bodies.connection_tune);
+
+    ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
+    out->send(new AMQFrame(0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
+
+    u_int16_t heartbeat = proposal->getHeartbeat();
+    connector->setReadTimeout(heartbeat * 2);
+    connector->setWriteTimeout(heartbeat);
+
+    //send connection.open
+    string capabilities;
+    string vhost = virtualhost;
+    responses.expect();
+    out->send(new AMQFrame(0, new ConnectionOpenBody(version, vhost, capabilities, true)));
+    //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
+    responses.waitForResponse();
+    if(responses.validate(method_bodies.connection_open_ok)){
+        //ok
+    }else if(responses.validate(method_bodies.connection_redirect)){
+        //ignore for now
+        ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse()));
+        std::cout << "Received redirection to " << redirect->getHost() << std::endl;
+    }else{
+        THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
+    }
+    
+}
+
+void Connection::close(){
+    if(!closed){
+        u_int16_t code(200);
+        string text("Ok");
+        u_int16_t classId(0);
+        u_int16_t methodId(0);
+        
+        sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
+        connector->close();
+    }
+}
+
+void Connection::openChannel(Channel* channel){
+    channel->con = this;
+    channel->id = ++channelIdCounter;
+    channel->out = out;
+    channels[channel->id] = channel;
+    //now send frame to open channel and wait for response
+    string oob;
+    channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
+    channel->setQos();
+    channel->closed = false;
+}
+
+void Connection::closeChannel(Channel* channel){
+    //send frame to close channel
+    u_int16_t code(200);
+    string text("Ok");
+    u_int16_t classId(0);
+    u_int16_t methodId(0);
+    closeChannel(channel, code, text, classId, methodId);
+}
+
+void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){
+    //send frame to close channel
+    channel->cancelAll();
+    channel->closed = true;
+    channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
+    channel->con = 0;
+    channel->out = 0;
+    removeChannel(channel);
+}
+
+void Connection::removeChannel(Channel* channel){
+    //send frame to close channel
+
+    channels.erase(channel->id);
+    channel->out = 0;    
+    channel->id = 0;
+    channel->con = 0;
+}
+
+void Connection::received(AMQFrame* frame){
+    u_int16_t channelId = frame->getChannel();
+
+    if(channelId == 0){
+        this->handleBody(frame->getBody());
+    }else{
+        Channel* channel = channels[channelId];
+        if(channel == 0){
+            error(504, "Unknown channel");
+        }else{
+            try{
+                channel->handleBody(frame->getBody());
+            }catch(qpid::QpidError e){
+                channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
+            }
+        }
+    }
+}
+
+void Connection::handleMethod(AMQMethodBody::shared_ptr body){
+    //connection.close, basic.deliver, basic.return or a response to a synchronous request
+    if(responses.isWaiting()){
+        responses.signalResponse(body);
+    }else if(method_bodies.connection_close.match(body.get())){
+        //send back close ok
+        //close socket
+        ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get());
+        std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl;
+        connector->close();
+    }else{
+        std::cout << "Unhandled method for connection: " << *body << std::endl;
+        error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId());
+    }
+}
+    
+void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){
+    error(504, "Channel error: received header body with channel 0.");
+}
+    
+void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){
+    error(504, "Channel error: received content body with channel 0.");
+}
+    
+void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
+}
+
+void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
+    responses.expect();
+    out->send(frame);
+    responses.receive(body);
+}
+
+void Connection::error(int code, const string& msg, int classid, int methodid){
+    std::cout << "Connection exception generated: " << code << msg;
+    if(classid || methodid){
+        std::cout << " [" << methodid << ":" << classid << "]";
+    }
+    std::cout << std::endl;
+    sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
+    connector->close();
+}
+
+void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){
+    std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl;
+    int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500;
+    string msg = e.msg;
+    if(method == 0){
+        closeChannel(channel, code, msg);
+    }else{
+        closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId());
+    }
+}
+
+void Connection::idleIn(){
+    std::cout << "Connection timed out due to abscence of heartbeat." << std::endl;
+    connector->close();
+}
+
+void Connection::idleOut(){
+    out->send(new AMQFrame(0, new AMQHeartbeatBody()));
+}
+
+void Connection::shutdown(){
+    closed = true;
+    //close all channels
+    for(iterator i = channels.begin(); i != channels.end(); i++){
+        i->second->stop();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/Connection.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/Connection.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/Connection.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/Connection.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/Connection.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <map>
+#include <string>
+
+#ifndef _Connection_
+#define _Connection_
+
+#include <QpidError.h>
+#include <Connector.h>
+#include <sys/ShutdownHandler.h>
+#include <sys/TimeoutHandler.h>
+
+#include <framing/amqp_framing.h>
+#include <ClientExchange.h>
+#include <IncomingMessage.h>
+#include <ClientMessage.h>
+#include <MessageListener.h>
+#include <ClientQueue.h>
+#include <ResponseHandler.h>
+
+namespace qpid {
+namespace client {
+
+    class Channel;
+
+class Connection : public virtual qpid::framing::InputHandler, 
+        public virtual qpid::sys::TimeoutHandler, 
+        public virtual qpid::sys::ShutdownHandler, 
+        private virtual qpid::framing::BodyHandler{
+
+        typedef std::map<int, Channel*>::iterator iterator;
+
+	static u_int16_t channelIdCounter;
+
+	std::string host;
+	int port;
+	const u_int32_t max_frame_size;
+	std::map<int, Channel*> channels; 
+	Connector* connector;
+	qpid::framing::OutputHandler* out;
+	ResponseHandler responses;
+        volatile bool closed;
+    qpid::framing::ProtocolVersion version;
+
+        void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e);
+        void error(int code, const std::string& msg, int classid = 0, int methodid = 0);
+        void closeChannel(Channel* channel, u_int16_t code, std::string& text, u_int16_t classId = 0, u_int16_t methodId = 0);
+	void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
+
+	virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
+	virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
+	virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
+	virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+    public:
+
+	Connection(bool debug = false, u_int32_t max_frame_size = 65536);
+	~Connection();
+        void open(const std::string& host, int port = 5672, 
+                  const std::string& uid = "guest", const std::string& pwd = "guest", 
+                  const std::string& virtualhost = "/");
+        void close();
+	void openChannel(Channel* channel);
+	/*
+         * Requests that the server close this channel, then removes
+         * the association to the channel from this connection
+         */
+	void closeChannel(Channel* channel);
+	/*
+         * Removes the channel from association with this connection,
+	 * without sending a close request to the server.
+         */
+	void removeChannel(Channel* channel);
+
+	virtual void received(qpid::framing::AMQFrame* frame);
+
+	virtual void idleOut();
+	virtual void idleIn();
+
+	virtual void shutdown();
+
+	inline u_int32_t getMaxFrameSize(){ return max_frame_size; }
+    };
+
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/Connection.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/Connection.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/Connector.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/Connector.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/Connector.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,180 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <QpidError.h>
+#include <sys/Time.h>
+#include "Connector.h"
+
+using namespace qpid::sys;
+using namespace qpid::client;
+using namespace qpid::framing;
+using qpid::QpidError;
+
+Connector::Connector(bool _debug, u_int32_t buffer_size) :
+    debug(_debug), 
+    receive_buffer_size(buffer_size),
+    send_buffer_size(buffer_size),
+    closed(true),
+    lastIn(0), lastOut(0),
+    timeout(0),
+    idleIn(0), idleOut(0), 
+    timeoutHandler(0),
+    shutdownHandler(0),
+    inbuf(receive_buffer_size), 
+    outbuf(send_buffer_size){ }
+
+Connector::~Connector(){ }
+
+void Connector::connect(const std::string& host, int port){
+    socket = Socket::createTcp();
+    socket.connect(host, port);
+    closed = false;
+    receiver = Thread(this);
+}
+
+void Connector::init(ProtocolInitiation* header){
+    writeBlock(header);
+    delete header;
+}
+
+void Connector::close(){
+    closed = true;
+    socket.close();
+    receiver.join();
+}
+
+void Connector::setInputHandler(InputHandler* handler){
+    input = handler;
+}
+
+void Connector::setShutdownHandler(ShutdownHandler* handler){
+    shutdownHandler = handler;
+}
+
+OutputHandler* Connector::getOutputHandler(){ 
+    return this; 
+}
+
+void Connector::send(AMQFrame* frame){
+    writeBlock(frame);    
+    if(debug) std::cout << "SENT: " << *frame << std::endl; 
+    delete frame;
+}
+
+void Connector::writeBlock(AMQDataBlock* data){
+    Mutex::ScopedLock l(writeLock);
+    data->encode(outbuf);
+    //transfer data to wire
+    outbuf.flip();
+    writeToSocket(outbuf.start(), outbuf.available());
+    outbuf.clear();
+}
+
+void Connector::writeToSocket(char* data, size_t available){
+    size_t written = 0;
+    while(written < available && !closed){
+	ssize_t sent = socket.send(data + written, available-written);
+        if(sent > 0) {
+            lastOut = now() * TIME_MSEC;
+            written += sent;
+        }
+    }
+}
+
+void Connector::handleClosed(){
+    closed = true;
+    socket.close();
+    if(shutdownHandler) shutdownHandler->shutdown();
+}
+
+void Connector::checkIdle(ssize_t status){
+    if(timeoutHandler){
+         Time t = now() * TIME_MSEC;
+        if(status == Socket::SOCKET_TIMEOUT) {
+            if(idleIn && (t - lastIn > idleIn)){
+                timeoutHandler->idleIn();
+            }
+        }else if(status == Socket::SOCKET_EOF){
+            handleClosed();
+        }else{
+            lastIn = t;
+        }
+        if(idleOut && (t - lastOut > idleOut)){
+            timeoutHandler->idleOut();
+        }
+    }
+}
+
+void Connector::setReadTimeout(u_int16_t t){
+    idleIn = t * 1000;//t is in secs
+    if(idleIn && (!timeout || idleIn < timeout)){
+        timeout = idleIn;
+        setSocketTimeout();
+    }
+
+}
+
+void Connector::setWriteTimeout(u_int16_t t){
+    idleOut = t * 1000;//t is in secs
+    if(idleOut && (!timeout || idleOut < timeout)){
+        timeout = idleOut;
+        setSocketTimeout();
+    }
+}
+
+void Connector::setSocketTimeout(){
+    socket.setTimeout(timeout*TIME_MSEC);
+}
+
+void Connector::setTimeoutHandler(TimeoutHandler* handler){
+    timeoutHandler = handler;
+}
+
+void Connector::run(){
+    try{
+	while(!closed){
+            ssize_t available = inbuf.available();
+            if(available < 1){
+                THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
+            }
+            ssize_t received = socket.recv(inbuf.start(), available);
+	    checkIdle(received);
+
+	    if(!closed && received > 0){
+		inbuf.move(received);
+		inbuf.flip();//position = 0, limit = total data read
+		
+		AMQFrame frame;
+		while(frame.decode(inbuf)){
+                    if(debug) std::cout << "RECV: " << frame << std::endl; 
+		    input->received(&frame);
+		}
+                //need to compact buffer to preserve any 'extra' data
+                inbuf.compact();
+	    }
+	}
+    }catch(QpidError error){
+	std::cout << "Error [" << error.code << "] " << error.msg
+                  << " (" << error.location.file << ":" << error.location.line
+                  << ")" << std::endl;
+        handleClosed();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/Connector.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/Connector.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/Connector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/Connector.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/Connector.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/Connector.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Connector_
+#define _Connector_
+
+
+#include <framing/InputHandler.h>
+#include <framing/OutputHandler.h>
+#include <framing/InitiationHandler.h>
+#include <framing/ProtocolInitiation.h>
+#include <sys/ShutdownHandler.h>
+#include <sys/TimeoutHandler.h>
+#include <sys/Thread.h>
+#include <sys/Monitor.h>
+#include <sys/Socket.h>
+
+namespace qpid {
+namespace client {
+
+    class Connector : public qpid::framing::OutputHandler, 
+                      private qpid::sys::Runnable
+    {
+        const bool debug;
+	const int receive_buffer_size;
+	const int send_buffer_size;
+
+	bool closed;
+
+        int64_t lastIn;
+        int64_t lastOut;
+        int64_t timeout;
+        u_int32_t idleIn;
+        u_int32_t idleOut;
+
+        qpid::sys::TimeoutHandler* timeoutHandler;
+        qpid::sys::ShutdownHandler* shutdownHandler;
+	qpid::framing::InputHandler* input;
+	qpid::framing::InitiationHandler* initialiser;
+	qpid::framing::OutputHandler* output;
+	
+	qpid::framing::Buffer inbuf;
+	qpid::framing::Buffer outbuf;
+
+        qpid::sys::Mutex writeLock;
+	qpid::sys::Thread receiver;
+
+	qpid::sys::Socket socket;
+        
+        void checkIdle(ssize_t status);
+	void writeBlock(qpid::framing::AMQDataBlock* data);
+	void writeToSocket(char* data, size_t available);
+        void setSocketTimeout();
+
+	void run();
+	void handleClosed();
+
+    public:
+	Connector(bool debug = false, u_int32_t buffer_size = 1024);
+	virtual ~Connector();
+	virtual void connect(const std::string& host, int port);
+	virtual void init(qpid::framing::ProtocolInitiation* header);
+	virtual void close();
+	virtual void setInputHandler(qpid::framing::InputHandler* handler);
+	virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
+	virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
+	virtual qpid::framing::OutputHandler* getOutputHandler();
+	virtual void send(qpid::framing::AMQFrame* frame);
+        virtual void setReadTimeout(u_int16_t timeout);
+        virtual void setWriteTimeout(u_int16_t timeout);
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/Connector.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/Connector.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <IncomingMessage.h>
+#include <QpidError.h>
+#include <iostream>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){}
+IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){}
+IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){}
+
+IncomingMessage::~IncomingMessage(){
+}
+
+void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){
+    this->header = _header;
+}
+
+void IncomingMessage::addContent(AMQContentBody::shared_ptr _content){
+    this->content.push_back(_content);
+}
+
+bool IncomingMessage::isComplete(){
+    return header != 0 && header->getContentSize() == contentSize();
+}
+
+bool IncomingMessage::isReturn(){
+    return returned;
+}
+
+bool IncomingMessage::isDelivery(){
+    return delivered;
+}
+
+bool IncomingMessage::isResponse(){
+    return response;
+}
+
+const string& IncomingMessage::getConsumerTag(){
+    if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery");
+    return delivered->getConsumerTag();
+}
+
+u_int64_t IncomingMessage::getDeliveryTag(){
+    if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery");
+    return delivered->getDeliveryTag();
+}
+
+AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){
+    return header;
+}
+
+void IncomingMessage::getData(string& s){
+    int count(content.size());
+    for(int i = 0; i < count; i++){
+        if(i == 0) s = content[i]->getData();
+	else s += content[i]->getData();
+    }
+}
+
+u_int64_t IncomingMessage::contentSize(){
+    u_int64_t size(0);
+    u_int64_t count(content.size());
+    for(u_int64_t i = 0; i < count; i++){
+	size += content[i]->size();
+    }
+    return size;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <vector>
+#include <framing/amqp_framing.h>
+
+#ifndef _IncomingMessage_
+#define _IncomingMessage_
+
+#include <ClientMessage.h>
+
+namespace qpid {
+namespace client {
+
+    class IncomingMessage{
+        //content will be preceded by one of these method frames
+	qpid::framing::BasicDeliverBody::shared_ptr delivered;
+	qpid::framing::BasicReturnBody::shared_ptr returned;
+	qpid::framing::BasicGetOkBody::shared_ptr response;
+	qpid::framing::AMQHeaderBody::shared_ptr header;
+	std::vector<qpid::framing::AMQContentBody::shared_ptr> content;
+
+	u_int64_t contentSize();
+    public:
+	IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro);
+	IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro);
+	IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro);
+        ~IncomingMessage();
+	void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+	void addContent(qpid::framing::AMQContentBody::shared_ptr content);
+	bool isComplete();
+	bool isReturn();
+	bool isDelivery();
+	bool isResponse();
+	const std::string& getConsumerTag();//only relevant if isDelivery()
+	qpid::framing::AMQHeaderBody::shared_ptr& getHeader();
+        u_int64_t getDeliveryTag();
+	void getData(std::string& data);
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date