You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/12/07 20:13:11 UTC

svn commit: r602182 - in /incubator/qpid/trunk/qpid/cpp: rubygen/templates/ src/ src/qpid/client/ src/tests/

Author: aconway
Date: Fri Dec  7 11:13:09 2007
New Revision: 602182

URL: http://svn.apache.org/viewvc?rev=602182&view=rev
Log:

Summary:
 - Replaced InProcessBroker with BrokerFixture, uses a full loopback
   broker for more realistic tests.
 - Extracted non-generated parts of Session_0_10 into SessionBase.
 - Sundry small fixes.

src/tests/BrokerFixture.h
 - in process broker with loopback connections.
 - tests can force a disorderly disconnect.

src/qpid/client/Connector.h
 - back door to private members for BrokerFixture.
 - close() in destructor to avoid leaks.

src/qpid/client/ConnectionImpl.h,cpp:
 - close() in destructor, to fix hang when destroyed without being closed.

src/qpid/client/CompletionTracker.h,.cpp:
 - Fixed race in close/add.

src/qpid/client/SessionBase.h,cpp:
 - Extracted all non-generated code from Session_0_10 into SessionBase
 - Added sync()
 
src/tests/exception_test.cpp: Converted to boost & BrokerFixture

src/tests/ClientChannelTest.cpp, ClientSessionTest.cpp: Use BrokerFixture

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb Fri Dec  7 11:13:09 2007
@@ -68,19 +68,8 @@
 
   def generate()
     h_file(@file) {
-      include "qpid/framing/amqp_framing.h"
-      include "qpid/framing/Uuid.h"
-      include "qpid/framing/amqp_structs.h"
-      include "qpid/framing/ProtocolVersion.h"
-      include "qpid/framing/MethodContent.h"
-      include "qpid/framing/TransferContent.h"
-      include "qpid/client/Completion.h"
-      include "qpid/client/ConnectionImpl.h"
-      include "qpid/client/Response.h"
-      include "qpid/client/SessionCore.h"
-      include "qpid/client/TypedResult.h"
-      include "qpid/shared_ptr.h"
-      include "<string>"
+      include "qpid/client/SessionBase.h"
+
       namespace("qpid::client") { 
         genl "using std::string;"
         genl "using framing::Content;"
@@ -94,59 +83,23 @@
             genl "AMQP #{@amqp.version} session API."
             genl @amqp.class_("session").doc
           }
-          cpp_class(@classname) {
+          cpp_class(@classname, "public SessionBase") {
             public
-            gen <<EOS
-#{@classname}();
-
-/** Get the next message frame-set from the session. */
-framing::FrameSet::shared_ptr get() { return impl->get(); }
-
-/** Get the session ID */
-Uuid getId() const { return impl->getId(); }
-
-/** @param sync if true all session methods block till a response arrives. */
-void setSynchronous(bool sync) { impl->setSync(sync); }
-
-/** Suspend the session, can be resumed on a different connection.
- * @see Connection::resume()
- */
-void suspend();
-
-/** Close the session */
-void close();
-
-Execution& execution() { return impl->getExecution(); }
-
-typedef framing::TransferContent DefaultContent;
-EOS
+            genl "Session_0_10() {}"
+            genl "Session_0_10(shared_ptr<SessionCore> core) : SessionBase(core) {}"
             session_methods.each { |m|
               genl
               doxygen(m)
               args=m.sig_c_default.join(", ") 
               genl "#{m.return_type} #{m.session_function}(#{args});" 
             }
-            genl
-            protected
-            gen <<EOS
-shared_ptr<SessionCore> impl;
-framing::ProtocolVersion version;
-friend class Connection;
-#{@classname}(shared_ptr<SessionCore>);
-EOS
           }}}}
 
     cpp_file(@file) { 
       include @classname
       include "qpid/framing/all_method_bodies.h"
       namespace(@namespace) {
-        gen <<EOS
-using namespace framing;
-#{@classname}::#{@classname}() {}
-#{@classname}::#{@classname}(shared_ptr<SessionCore> core) : impl(core) {}
-void #{@classname}::suspend() { impl->suspend(); }
-void #{@classname}::close() { impl->close(); }
-EOS
+        genl "using namespace framing;"
         session_methods.each { |m|
           genl
           sig=m.signature_c.join(", ")

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Dec  7 11:13:09 2007
@@ -208,7 +208,7 @@
 libqpidclient_la_LIBADD = libqpidcommon.la
 libqpidclient_la_SOURCES =			\
   $(rgen_client_cpp)				\
-  qpid/client/Session.h				\
+  qpid/client/SessionBase.cpp			\
   qpid/client/Connection.cpp			\
   qpid/client/Channel.cpp			\
   qpid/client/Exchange.cpp			\
@@ -335,6 +335,7 @@
   qpid/client/MessageListener.h \
   qpid/client/MessageQueue.h \
   qpid/client/Response.h \
+  qpid/client/SessionBase.h \
   qpid/client/Session.h \
   qpid/client/SessionCore.h \
   qpid/client/StateManager.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp Fri Dec  7 11:13:09 2007
@@ -174,7 +174,7 @@
 
 bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
     string tag = "get-handler";
-    ScopedDivert handler(tag, session.execution().getDemux());
+    ScopedDivert handler(tag, session.getExecution().getDemux());
     Demux::QueuePtr incoming = handler.getQueue();
 
     session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1));
@@ -243,7 +243,7 @@
             bool send = i->second.ackMode == AUTO_ACK
                 || (prefetch &&  ++(i->second.count) > (prefetch / 2));
             if (send) i->second.count = 0;
-            session.execution().completed(content.getId(), true, send);
+            session.getExecution().completed(content.getId(), true, send);
         }
     } else {
         QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);                        

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp Fri Dec  7 11:13:09 2007
@@ -31,12 +31,13 @@
 const std::string empty;
 }
 
-CompletionTracker::CompletionTracker() {}
+CompletionTracker::CompletionTracker() : closed(false) {}
 CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {}
 
 void CompletionTracker::close()
 {   
     sys::Mutex::ScopedLock l(lock);
+    closed=true;
     while (!listeners.empty()) {
         Record r(listeners.front());
         {
@@ -47,17 +48,18 @@
     }
 }
 
+
 void CompletionTracker::completed(const SequenceNumber& _mark)
 {   
     sys::Mutex::ScopedLock l(lock);
     mark = _mark;
     while (!listeners.empty() && !(listeners.front().id > mark)) {
         Record r(listeners.front());
+        listeners.pop_front();
         {
             sys::Mutex::ScopedUnlock u(lock);
             r.completed();
         }
-        listeners.pop_front();
     }
 }
 
@@ -88,14 +90,13 @@
 bool CompletionTracker::add(const Record& record)
 {
     sys::Mutex::ScopedLock l(lock);
-    if (record.id < mark) {
+    if (record.id < mark || closed) {
         return false;
     } else {
         //insert at the correct position
         Listeners::iterator i = seek(record.id);
         if (i == listeners.end()) i = listeners.begin();
         listeners.insert(i, record);
-
         return true;
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h Fri Dec  7 11:13:09 2007
@@ -60,7 +60,8 @@
     };
 
     typedef std::list<Record> Listeners;
-
+    bool closed;
+    
     sys::Mutex lock;
     framing::SequenceNumber mark;
     Listeners listeners;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Fri Dec  7 11:13:09 2007
@@ -44,6 +44,8 @@
     connector->setShutdownHandler(this);
 }
 
+ConnectionImpl::~ConnectionImpl() { close(); }
+
 void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session)
 {
     Mutex::ScopedLock l(lock);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Fri Dec  7 11:13:09 2007
@@ -62,6 +62,8 @@
     typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
 
     ConnectionImpl(boost::shared_ptr<Connector> c);
+    ~ConnectionImpl();
+    
     void addSession(const boost::shared_ptr<SessionCore>&);
         
     void open(const std::string& host, int port = 5672, 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Fri Dec  7 11:13:09 2007
@@ -51,7 +51,8 @@
 {
 }
 
-Connector::~Connector(){
+Connector::~Connector() {
+    close();
     if (receiver.id() && receiver.id() != Thread::current().id())
         receiver.join();
 }
@@ -76,7 +77,6 @@
     receiver = Thread(this);
 }
 
-// Call with closedLock held
 bool Connector::closeInternal() {
     Mutex::ScopedLock l(closedLock);
     if (!closed) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Fri Dec  7 11:13:09 2007
@@ -88,6 +88,8 @@
     void eof(qpid::sys::AsynchIO&);
     
   friend class Channel;
+  friend class TestConnector;
+
   public:
     Connector(framing::ProtocolVersion pVersion,
               bool debug = false, uint32_t buffer_size = 1024);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Fri Dec  7 11:13:09 2007
@@ -52,8 +52,8 @@
     : session(s), running(false)
 {
     queue = q.empty() ? 
-        session.execution().getDemux().getDefault() : 
-        session.execution().getDemux().get(q); 
+        session.getExecution().getDemux().getDefault() : 
+        session.getExecution().getDemux().get(q); 
 }
 
 void Dispatcher::start()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Fri Dec  7 11:13:09 2007
@@ -167,6 +167,8 @@
     out(frame);    
 }
 
+SequenceNumber ExecutionHandler::lastSent() const { return outgoingCounter; }
+
 SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener listener)
 {
     Mutex::ScopedLock l(lock);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Fri Dec  7 11:13:09 2007
@@ -80,6 +80,7 @@
     framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener());
     framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content, 
                                  ResultListener=ResultListener());
+    framing::SequenceNumber lastSent() const;
     void sendSyncRequest();
     void sendFlushRequest();
     void completed(const framing::SequenceNumber& id, bool cumulative, bool send);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h Fri Dec  7 11:13:09 2007
@@ -65,12 +65,12 @@
 
     void acknowledge(Session_0_10& session, bool cumulative = true, bool send = true) const
     {
-        session.execution().completed(id, cumulative, send);
+        session.getExecution().completed(id, cumulative, send);
     }
 
     void acknowledge(bool cumulative = true, bool send = true) const
     {
-        const_cast<Session_0_10&>(session).execution().completed(id, cumulative, send);
+        const_cast<Session_0_10&>(session).getExecution().completed(id, cumulative, send);
     }
 
     /**@internal for incoming messages */

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp?rev=602182&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp Fri Dec  7 11:13:09 2007
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionBase.h"
+
+namespace qpid {
+namespace client {
+using namespace framing;
+
+SessionBase::SessionBase() {}
+SessionBase::~SessionBase() {}
+SessionBase::SessionBase(shared_ptr<SessionCore> core) : impl(core) {}
+void SessionBase::suspend() { impl->suspend(); }
+void SessionBase::close() { impl->close(); }
+void SessionBase::setSynchronous(bool isSync) { impl->setSync(isSync); }
+bool SessionBase::isSynchronous() const { return impl->isSync(); }
+Execution& SessionBase::getExecution() { return impl->getExecution(); }
+Uuid SessionBase::getId() const { return impl->getId(); }
+framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
+}} // namespace qpid::client

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h?rev=602182&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h Fri Dec  7 11:13:09 2007
@@ -0,0 +1,101 @@
+#ifndef QPID_CLIENT_SESSIONBASE_H
+#define QPID_CLIENT_SESSIONBASE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/amqp_structs.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/MethodContent.h"
+#include "qpid/framing/TransferContent.h"
+#include "qpid/client/Completion.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/Response.h"
+#include "qpid/client/SessionCore.h"
+#include "qpid/client/TypedResult.h"
+#include "qpid/shared_ptr.h"
+#include <string>
+
+namespace qpid {
+namespace client {
+
+using std::string;
+using framing::Content;
+using framing::FieldTable;
+using framing::MethodContent;
+using framing::SequenceNumberSet;
+using framing::Uuid;
+
+/**
+ * Basic session operations that are not derived from AMQP XML methods.
+ */
+class SessionBase
+{
+  public:
+    SessionBase();
+    ~SessionBase();
+
+    /** Get the next message frame-set from the session. */
+    framing::FrameSet::shared_ptr get();
+    
+    /** Get the session ID */
+    Uuid getId() const;
+
+    /**
+     * In synchronous mode, the session sets the sync bit on every
+     * command and waits for the broker's response before returning.
+     * Note this gives lower throughput than non-synchronous mode.
+     *
+     * In non-synchronous mode commands are sent without waiting
+     * for a respose (you can use the returned Completion object
+     * to wait for completion.)
+     * 
+     *@param if true set the session to synchronous mode, else
+     * set it to non-synchronous mode.
+     */
+    void setSynchronous(bool isSync);
+
+    bool isSynchronous() const;
+
+    /**
+     * Suspend the session, can be resumed on a different connection.
+     * @see Connection::resume()
+     */
+    void suspend();
+    
+    /** Close the session */
+    void close();
+    
+    Execution& getExecution();
+    
+    typedef framing::TransferContent DefaultContent;
+
+  protected:
+    shared_ptr<SessionCore> impl;
+    framing::ProtocolVersion version;
+    friend class Connection;
+    SessionBase(shared_ptr<SessionCore>);
+};
+
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_SESSIONBASE_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Fri Dec  7 11:13:09 2007
@@ -203,8 +203,10 @@
     // user thread
     {
         Lock l(state);
-        if (state==OPEN)
-            doSuspend(REPLY_SUCCESS, OK);
+        if (state==SUSPENDED) { // Clear error that caused suspend
+            code=REPLY_SUCCESS;
+            text=OK;
+        }
         check(state==SUSPENDED, COMMAND_INVALID, CANNOT_RESUME_SESSION);
         SequenceNumber sendAck=session->resuming();
         attaching(c);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Fri Dec  7 11:13:09 2007
@@ -38,29 +38,30 @@
       confirmMode(true), acquireMode(false)
 {}
 
-void SubscriptionManager::subscribeInternal(
+Completion SubscriptionManager::subscribeInternal(
     const std::string& q, const std::string& dest)
 {
-    session.messageSubscribe(arg::queue=q, arg::destination=dest,
+    Completion c = session.messageSubscribe(arg::queue=q, arg::destination=dest,
                              arg::confirmMode=confirmMode, arg::acquireMode=acquireMode);
     setFlowControl(dest, messages, bytes, window);
+    return c;
 }
 
-void SubscriptionManager::subscribe(
+Completion SubscriptionManager::subscribe(
     MessageListener& listener, const std::string& q, const std::string& d)
 {
     std::string dest=d.empty() ? q:d;
     dispatcher.listen(dest, &listener, autoAck);
-    subscribeInternal(q, dest);
+    return subscribeInternal(q, dest);
 }
 
-void SubscriptionManager::subscribe(
+Completion SubscriptionManager::subscribe(
     LocalQueue& lq, const std::string& q, const std::string& d)
 {
     std::string dest=d.empty() ? q:d;
     lq.session=session;
-    lq.queue=session.execution().getDemux().add(dest, ByTransferDest(dest));
-    subscribeInternal(q, dest);
+    lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest));
+    return subscribeInternal(q, dest);
 }
 
 void SubscriptionManager::setFlowControl(
@@ -91,7 +92,9 @@
     session.messageCancel(dest);
 }
 
-void SubscriptionManager::run(bool autoStop)
+void SubscriptionManager::setAutoStop(bool set) { autoStop=set; }
+
+void SubscriptionManager::run()
 {
     dispatcher.setAutoStop(autoStop);
     dispatcher.run();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Fri Dec  7 11:13:09 2007
@@ -23,21 +23,24 @@
  */
 #include "qpid/sys/Mutex.h"
 #include <qpid/client/Dispatcher.h>
+#include <qpid/client/Completion.h>
 #include <qpid/client/Session_0_10.h>
 #include <qpid/client/MessageListener.h>
 #include <qpid/client/LocalQueue.h>
+#include <qpid/sys/Runnable.h>
+
 #include <set>
 #include <sstream>
 
 namespace qpid {
 namespace client {
 
-class SubscriptionManager
+class SubscriptionManager : public sys::Runnable
 {
     typedef sys::Mutex::ScopedLock Lock;
     typedef sys::Mutex::ScopedUnlock Unlock;
 
-    void subscribeInternal(const std::string& q, const std::string& dest);
+    Completion subscribeInternal(const std::string& q, const std::string& dest);
     
     qpid::client::Dispatcher dispatcher;
     qpid::client::Session_0_10& session;
@@ -47,8 +50,9 @@
     AckPolicy autoAck;
     bool confirmMode;
     bool acquireMode;
-
-public:
+    bool autoStop;
+    
+  public:
     SubscriptionManager(Session_0_10& session);
     
     /**
@@ -59,9 +63,9 @@
      *@param tag Unique destination tag for the listener.
      * If not specified, the queue name is used.
      */
-    void subscribe(MessageListener& listener,
-                   const std::string& queue,
-                   const std::string& tag=std::string());
+    Completion subscribe(MessageListener& listener,
+                         const std::string& queue,
+                         const std::string& tag=std::string());
 
     /**
      * Subscribe a LocalQueue to receive messages from queue.
@@ -70,17 +74,21 @@
      *@param tag Unique destination tag for the listener.
      * If not specified, the queue name is used.
      */
-    void subscribe(LocalQueue& localQueue,
+    Completion subscribe(LocalQueue& localQueue,
                    const std::string& queue,
                    const std::string& tag=std::string());
 
     /** Cancel a subscription. */
     void cancel(const std::string tag);
 
-    /** Deliver messages until stop() is called.
-     *@param autoStop If true, return when all listeners are cancelled.
+    /** Deliver messages until stop() is called. */
+    void run();
+
+    /** If set true, run() will stop when all subscriptions
+     * are cancelled. If false, run will only stop when stop()
+     * is called. True by default.
      */
-    void run(bool autoStop=true);
+    void setAutoStop(bool set=true);
 
     /** Cause run() to return */
     void stop();

Added: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=602182&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Fri Dec  7 11:13:09 2007
@@ -0,0 +1,101 @@
+#ifndef TESTS_BROKERFIXTURE_H
+#define TESTS_BROKERFIXTURE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Thread.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/Session_0_10.h"
+#include "qpid/client/SubscriptionManager.h"
+
+namespace qpid { namespace client {
+/** Back door into private Connector stuff */
+struct TestConnector {
+    static void disconnect(qpid::client::Connector& c) {
+        c.socket.close();
+        c.handleClosed();
+    }
+};
+}}
+
+/**
+ * A fixture to create an in-process broker and connect to it for tests.
+ */
+struct BrokerFixture {
+    typedef qpid::broker::Broker Broker;
+    typedef boost::shared_ptr<Broker> BrokerPtr;
+
+    struct OpenConnection : public qpid::client::Connection {
+        OpenConnection(int port) { open("localhost", port); }
+    };
+    
+    BrokerPtr broker;
+    qpid::sys::Thread brokerThread;
+    OpenConnection connection;
+    qpid::client::Session_0_10 session;
+    qpid::client::SubscriptionManager subs;
+    qpid::client::LocalQueue lq;
+        
+    BrokerPtr newBroker() {
+        Broker::Options opts;
+        opts.port=0;
+        opts.workerThreads=1;
+        BrokerPtr b=Broker::create(opts);
+        // TODO aconway 2007-12-05: Without the following line
+        // the test can hang in the connection ctor. This is
+        // a race condition that should be fixed.
+        b->getPort(); 
+        return b;
+    };
+
+    BrokerFixture() : broker(newBroker()),
+                      brokerThread(*broker),
+                      connection(broker->getPort()),
+                      session(connection.newSession()),
+                      subs(session)
+    {}
+
+    ~BrokerFixture() {
+        connection.close();
+        broker->shutdown();
+        brokerThread.join();
+    }
+
+    /** Open a connection to the local broker */
+    void open(qpid::client::Connection& c) {
+        c.open("localhost", broker->getPort());
+    }
+
+    /** Close a connection's socket */ 
+    static void disconnect(qpid::client::Connection& c) {
+        struct Expose : public qpid::client::Connection {
+            void disconnect() {
+                qpid::client::TestConnector::disconnect(*impl->getConnector());
+            }
+        };
+        static_cast<Expose&>(c).disconnect();
+    }
+};
+
+#endif  /*!TESTS_BROKERFIXTURE_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp Fri Dec  7 11:13:09 2007
@@ -20,7 +20,7 @@
  */
 #include <vector>
 #include "qpid_test_plugin.h"
-#include "InProcessBroker.h"
+#include "BrokerFixture.h"
 #include "qpid/client/Channel.h"
 #include "qpid/client/Message.h"
 #include "qpid/client/Queue.h"
@@ -44,7 +44,7 @@
  * The test base defines the tests methods, derived classes
  * instantiate the channel in Basic or Message mode.
  */
-class ChannelTestBase : public CppUnit::TestCase  
+class ChannelTestBase : public CppUnit::TestCase, public BrokerFixture
 {
     struct Listener: public qpid::client::MessageListener {
         vector<Message> messages;
@@ -56,7 +56,6 @@
         }
     };
     
-    qpid::InProcessBrokerClient connection; 
     const std::string qname;
     const std::string data;
     Queue queue;
@@ -69,8 +68,7 @@
   public:
 
     ChannelTestBase()
-        : connection(FRAME_MAX),
-          qname("testq"), data("hello"),
+        : qname("testq"), data("hello"),
           queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE)
     {}
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Fri Dec  7 11:13:09 2007
@@ -19,7 +19,7 @@
  *
  */
 #include "qpid_test_plugin.h"
-#include "InProcessBroker.h"
+#include "BrokerFixture.h"
 #include "qpid/client/Dispatcher.h"
 #include "qpid/client/Session_0_10.h"
 #include "qpid/framing/TransferContent.h"
@@ -61,7 +61,7 @@
     }
 };
 
-class ClientSessionTest : public CppUnit::TestCase
+class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture
 {
     CPPUNIT_TEST_SUITE(ClientSessionTest);
     CPPUNIT_TEST(testQueueQuery);
@@ -74,23 +74,14 @@
     CPPUNIT_TEST_SUITE_END();
 
     shared_ptr<broker::Broker> broker;
-    Session_0_10 session;
-    // Defer construction & thread creation to setUp
-    boost::optional<InProcessConnection> c;
-    boost::optional<InProcessConnection> c2;
 
-public:
+  public:
 
     void setUp() {
         broker = broker::Broker::create();
-        c=boost::in_place<InProcessConnection>(broker);
-        c2=boost::in_place<InProcessConnection>(broker);
     }
 
     void tearDown() {
-        c2.reset();
-        c.reset();
-        broker.reset();
     }
 
     void declareSubscribe(const std::string& q="my-queue",
@@ -109,7 +100,7 @@
     
     void testQueueQuery() 
     {
-        session = c->newSession();
+        session =connection.newSession();
         session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true);
         TypedResult<QueueQueryResult> result = session.queueQuery(std::string("my-queue"));
         CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable());
@@ -120,7 +111,7 @@
 
     void testTransfer()
     {
-        session = c->newSession();
+        session =connection.newSession();
         declareSubscribe();
         session.messageTransfer(content=TransferContent("my-message", "my-queue"));
         //get & test the message:
@@ -128,12 +119,12 @@
         CPPUNIT_ASSERT(msg->isA<MessageTransferBody>());
         CPPUNIT_ASSERT_EQUAL(std::string("my-message"), msg->getContent());
         //confirm receipt:
-        session.execution().completed(msg->getId(), true, true);
+        session.getExecution().completed(msg->getId(), true, true);
     }
 
     void testDispatcher()
     {
-        session = c->newSession();
+        session =connection.newSession();
         declareSubscribe();
 
         TransferContent msg1("One");
@@ -161,16 +152,16 @@
     }
 
     void testResumeExpiredError() {
-        session = c->newSession(0);
+        session =connection.newSession(0);
         session.suspend();  // session has 0 timeout.
         try {
-            c->resume(session);
+           connection.resume(session);
             CPPUNIT_FAIL("Expected InvalidArgumentException.");
         } catch(const InternalErrorException&) {}
     }
 
     void testUseSuspendedError() {
-        session = c->newSession(60);
+        session =connection.newSession(60);
         session.suspend();
         try {
             session.exchangeQuery(name="amq.fanout");
@@ -179,26 +170,27 @@
     }
 
     void testSuspendResume() {
-        session = c->newSession(60);
+        session =connection.newSession(60);
         declareSubscribe();
         session.suspend();
         // Make sure we are still subscribed after resume.
-        c->resume(session);
+       connection.resume(session);
         session.messageTransfer(content=TransferContent("my-message", "my-queue"));
         FrameSet::shared_ptr msg = session.get();
         CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent());
     }
 
     void testDisconnectResume() {
-        session = c->newSession(60);
+        session =connection.newSession(60);
         session.queueDeclare(queue="before");
         CPPUNIT_ASSERT(queueExists("before"));
-        // Simulate lost frames.
-        c->discard();
         session.queueDeclare(queue=string("after"));
-        c->disconnect(); // Simulate disconnect, resume on a new connection.
-        c2->resume(session);
+        disconnect(connection);
+        Connection c2;
+        open(c2);
+        c2.resume(session);
         CPPUNIT_ASSERT(queueExists("after"));
+        c2.close();
     }
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Fri Dec  7 11:13:09 2007
@@ -20,7 +20,7 @@
  */
 
 #include "unit_test.h"
-#include "InProcessBroker.h"
+#include "BrokerFixture.h"
 #include "qpid/client/SubscriptionManager.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
@@ -31,74 +31,74 @@
 
 using namespace std;
 using namespace qpid;
+using namespace sys;
 using namespace client;
 using namespace framing;
 
-struct Fixture {
-    InProcessConnection connection;
-    InProcessConnection connection2;
-    Session_0_10 session;
-    SubscriptionManager sub;
-    LocalQueue q;
-
-    Fixture() : connection(),
-                connection2(connection.getBroker()),
-                session(connection.newSession()),
-                sub(session)
-    {
-        session.queueDeclare(arg::queue="q");
+using boost::bind;
+using boost::function;
+
+template <class Ex>
+struct Catcher : public Runnable {
+    function<void ()> f;
+    bool caught;
+    Thread thread;
+    
+    Catcher(function<void ()> f_) : f(f_), caught(false), thread(this) {}
+    ~Catcher() { join(); }
+    
+    void run() {
+        try { f(); }
+        catch(const Ex& e) {
+            caught=true;
+            BOOST_MESSAGE(e.what());
+        }
+        catch(const std::exception& e) {
+            BOOST_ERROR(string("Bad exception: ")+e.what());
+        }
+        catch(...) {
+            BOOST_ERROR(string("Bad exception: unknown"));
+        }
     }
-};
 
+    bool join() {
+        if (thread.id()) {
+            thread.join();
+            thread=Thread();
+        }
+        return caught;
+    }
+};
 
-// TODO aconway 2007-11-30: need InProcessBroker to be a more accurate
-// simulation of shutdown behaviour. It should override only 
-// Connector.run() to substitute NetworkQueues for the Dispatcher.
-// 
-// template <class Ex>
-// struct Catcher : public sys::Runnable {
-//     Session_0_10 s;
-//     boost::function<void ()> f;
-//     bool caught;
-//     Catcher(Session_0_10 s_, boost::function<void ()> f_)
-//         : s(s_), f(f_), caught(false) {}
-//     void run() {
-//         try { f(); } catch(const Ex& e) {
-//             caught=true;
-//             BOOST_MESSAGE(e.what());
-//         }
-//     }
-// };
-
-// BOOST_FIXTURE_TEST_CASE(DisconnectedGet, Fixture) {
-//     Catcher<Exception> get(session, boost::bind(&Session_0_10::get, session));
-//     sub.subscribe(q, "q");
-//     sys::Thread t(get);
-//     connection.disconnect();
-//     t.join();
-//     BOOST_CHECK(get.caught);
-// }
+BOOST_FIXTURE_TEST_CASE(DisconnectedGet, BrokerFixture) {
+    Catcher<ClosedException> get(bind(&Session_0_10::get, session));
+    disconnect(connection);
+    BOOST_CHECK(get.join());
+}
+
+BOOST_FIXTURE_TEST_CASE(DisconnectedPop, BrokerFixture) {
+    session.queueDeclare(arg::queue="q");
+    subs.subscribe(lq, "q");
+    Catcher<ClosedException> pop(bind(&LocalQueue::pop, boost::ref(lq)));
+    disconnect(connection);
+    BOOST_CHECK(pop.join());
+}
 
-// BOOST_FIXTURE_TEST_CASE(DisconnectedListen, Fixture) {
+// FIXME aconway 2007-12-07: This test hangs sporadically at t.join
+// BOOST_FIXTURE_TEST_CASE(DisconnectedListen, BrokerFixture) {
 //     struct NullListener : public MessageListener {
 //         void received(Message&) { BOOST_FAIL("Unexpected message"); }
 //     } l;
-//     sub.subscribe(l, "q");
-//     connection.disconnect();
-//     try {
-//         sub.run();
-//         BOOST_FAIL("Expected exception");
-//     } catch (const Exception&e) { BOOST_FAIL(e.what()); }
-//     try {
-//         session.queueDeclare(arg::queue="foo");
-//         BOOST_FAIL("Expected exception");
-//     } catch (const Exception&e) { BOOST_FAIL(e.what()); }
+//     session.queueDeclare(arg::queue="q");
+//     subs.subscribe(l, "q");
+//     Thread t(subs);
+//     disconnect(connection);
+//     t.join();
+//     BOOST_CHECK_THROW(session.close(), InternalErrorException);    
 // }
 
-// TODO aconway 2007-11-30: setSynchronous appears not to work.
-// BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, Fixture) {
-//     session.setSynchronous(true);
-//     BOOST_CHECK_THROW(sub.subscribe(q, "no such queue"), NotFoundException);
-// }
+BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, BrokerFixture) {
+    BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), NotFoundException);
+}
 
 QPID_AUTO_TEST_SUITE_END()

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Fri Dec  7 11:13:09 2007
@@ -205,7 +205,7 @@
             }
         }
         // Make sure this is all completed before we return.
-        session.execution().sendSyncRequest();
+        session.getExecution().sendSyncRequest();
     }
 };
 
@@ -231,13 +231,9 @@
     
     // Functor to collect rates.
     void operator()(const string& data) {
-        try {
-            double d=lexical_cast<double>(data);
-            values.push_back(d);
-            sum += d;
-        } catch (...) {
-            throw Exception(QPID_MSG("Bad data, expecting double: " << data));
-        }
+        double d=lexical_cast<double>(data);
+        values.push_back(d);
+        sum += d;
     }
     
     double mean() const {