You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/12/07 20:13:11 UTC
svn commit: r602182 - in /incubator/qpid/trunk/qpid/cpp: rubygen/templates/
src/ src/qpid/client/ src/tests/
Author: aconway
Date: Fri Dec 7 11:13:09 2007
New Revision: 602182
URL: http://svn.apache.org/viewvc?rev=602182&view=rev
Log:
Summary:
- Replaced InProcessBroker with BrokerFixture, uses a full loopback
broker for more realistic tests.
- Extracted non-generated parts of Session_0_10 into SessionBase.
- Sundry small fixes.
src/tests/BrokerFixture.h
- in process broker with loopback connections.
- tests can force a disorderly disconnect.
src/qpid/client/Connector.h
- back door to private members for BrokerFixture.
- close() in destructor to avoid leaks.
src/qpid/client/ConnectionImpl.h,cpp:
- close() in destructor, to fix hang when destroyed without being closed.
src/qpid/client/CompletionTracker.h,.cpp:
- Fixed race in close/add.
src/qpid/client/SessionBase.h,cpp:
- Extracted all non-generated code from Session_0_10 into SessionBase
- Added sync()
src/tests/exception_test.cpp: Converted to boost & BrokerFixture
src/tests/ClientChannelTest.cpp, ClientSessionTest.cpp: Use BrokerFixture
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h (with props)
incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb Fri Dec 7 11:13:09 2007
@@ -68,19 +68,8 @@
def generate()
h_file(@file) {
- include "qpid/framing/amqp_framing.h"
- include "qpid/framing/Uuid.h"
- include "qpid/framing/amqp_structs.h"
- include "qpid/framing/ProtocolVersion.h"
- include "qpid/framing/MethodContent.h"
- include "qpid/framing/TransferContent.h"
- include "qpid/client/Completion.h"
- include "qpid/client/ConnectionImpl.h"
- include "qpid/client/Response.h"
- include "qpid/client/SessionCore.h"
- include "qpid/client/TypedResult.h"
- include "qpid/shared_ptr.h"
- include "<string>"
+ include "qpid/client/SessionBase.h"
+
namespace("qpid::client") {
genl "using std::string;"
genl "using framing::Content;"
@@ -94,59 +83,23 @@
genl "AMQP #{@amqp.version} session API."
genl @amqp.class_("session").doc
}
- cpp_class(@classname) {
+ cpp_class(@classname, "public SessionBase") {
public
- gen <<EOS
-#{@classname}();
-
-/** Get the next message frame-set from the session. */
-framing::FrameSet::shared_ptr get() { return impl->get(); }
-
-/** Get the session ID */
-Uuid getId() const { return impl->getId(); }
-
-/** @param sync if true all session methods block till a response arrives. */
-void setSynchronous(bool sync) { impl->setSync(sync); }
-
-/** Suspend the session, can be resumed on a different connection.
- * @see Connection::resume()
- */
-void suspend();
-
-/** Close the session */
-void close();
-
-Execution& execution() { return impl->getExecution(); }
-
-typedef framing::TransferContent DefaultContent;
-EOS
+ genl "Session_0_10() {}"
+ genl "Session_0_10(shared_ptr<SessionCore> core) : SessionBase(core) {}"
session_methods.each { |m|
genl
doxygen(m)
args=m.sig_c_default.join(", ")
genl "#{m.return_type} #{m.session_function}(#{args});"
}
- genl
- protected
- gen <<EOS
-shared_ptr<SessionCore> impl;
-framing::ProtocolVersion version;
-friend class Connection;
-#{@classname}(shared_ptr<SessionCore>);
-EOS
}}}}
cpp_file(@file) {
include @classname
include "qpid/framing/all_method_bodies.h"
namespace(@namespace) {
- gen <<EOS
-using namespace framing;
-#{@classname}::#{@classname}() {}
-#{@classname}::#{@classname}(shared_ptr<SessionCore> core) : impl(core) {}
-void #{@classname}::suspend() { impl->suspend(); }
-void #{@classname}::close() { impl->close(); }
-EOS
+ genl "using namespace framing;"
session_methods.each { |m|
genl
sig=m.signature_c.join(", ")
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Dec 7 11:13:09 2007
@@ -208,7 +208,7 @@
libqpidclient_la_LIBADD = libqpidcommon.la
libqpidclient_la_SOURCES = \
$(rgen_client_cpp) \
- qpid/client/Session.h \
+ qpid/client/SessionBase.cpp \
qpid/client/Connection.cpp \
qpid/client/Channel.cpp \
qpid/client/Exchange.cpp \
@@ -335,6 +335,7 @@
qpid/client/MessageListener.h \
qpid/client/MessageQueue.h \
qpid/client/Response.h \
+ qpid/client/SessionBase.h \
qpid/client/Session.h \
qpid/client/SessionCore.h \
qpid/client/StateManager.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp Fri Dec 7 11:13:09 2007
@@ -174,7 +174,7 @@
bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
string tag = "get-handler";
- ScopedDivert handler(tag, session.execution().getDemux());
+ ScopedDivert handler(tag, session.getExecution().getDemux());
Demux::QueuePtr incoming = handler.getQueue();
session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1));
@@ -243,7 +243,7 @@
bool send = i->second.ackMode == AUTO_ACK
|| (prefetch && ++(i->second.count) > (prefetch / 2));
if (send) i->second.count = 0;
- session.execution().completed(content.getId(), true, send);
+ session.getExecution().completed(content.getId(), true, send);
}
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp Fri Dec 7 11:13:09 2007
@@ -31,12 +31,13 @@
const std::string empty;
}
-CompletionTracker::CompletionTracker() {}
+CompletionTracker::CompletionTracker() : closed(false) {}
CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {}
void CompletionTracker::close()
{
sys::Mutex::ScopedLock l(lock);
+ closed=true;
while (!listeners.empty()) {
Record r(listeners.front());
{
@@ -47,17 +48,18 @@
}
}
+
void CompletionTracker::completed(const SequenceNumber& _mark)
{
sys::Mutex::ScopedLock l(lock);
mark = _mark;
while (!listeners.empty() && !(listeners.front().id > mark)) {
Record r(listeners.front());
+ listeners.pop_front();
{
sys::Mutex::ScopedUnlock u(lock);
r.completed();
}
- listeners.pop_front();
}
}
@@ -88,14 +90,13 @@
bool CompletionTracker::add(const Record& record)
{
sys::Mutex::ScopedLock l(lock);
- if (record.id < mark) {
+ if (record.id < mark || closed) {
return false;
} else {
//insert at the correct position
Listeners::iterator i = seek(record.id);
if (i == listeners.end()) i = listeners.begin();
listeners.insert(i, record);
-
return true;
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h Fri Dec 7 11:13:09 2007
@@ -60,7 +60,8 @@
};
typedef std::list<Record> Listeners;
-
+ bool closed;
+
sys::Mutex lock;
framing::SequenceNumber mark;
Listeners listeners;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Fri Dec 7 11:13:09 2007
@@ -44,6 +44,8 @@
connector->setShutdownHandler(this);
}
+ConnectionImpl::~ConnectionImpl() { close(); }
+
void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session)
{
Mutex::ScopedLock l(lock);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Fri Dec 7 11:13:09 2007
@@ -62,6 +62,8 @@
typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
ConnectionImpl(boost::shared_ptr<Connector> c);
+ ~ConnectionImpl();
+
void addSession(const boost::shared_ptr<SessionCore>&);
void open(const std::string& host, int port = 5672,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Fri Dec 7 11:13:09 2007
@@ -51,7 +51,8 @@
{
}
-Connector::~Connector(){
+Connector::~Connector() {
+ close();
if (receiver.id() && receiver.id() != Thread::current().id())
receiver.join();
}
@@ -76,7 +77,6 @@
receiver = Thread(this);
}
-// Call with closedLock held
bool Connector::closeInternal() {
Mutex::ScopedLock l(closedLock);
if (!closed) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Fri Dec 7 11:13:09 2007
@@ -88,6 +88,8 @@
void eof(qpid::sys::AsynchIO&);
friend class Channel;
+ friend class TestConnector;
+
public:
Connector(framing::ProtocolVersion pVersion,
bool debug = false, uint32_t buffer_size = 1024);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Fri Dec 7 11:13:09 2007
@@ -52,8 +52,8 @@
: session(s), running(false)
{
queue = q.empty() ?
- session.execution().getDemux().getDefault() :
- session.execution().getDemux().get(q);
+ session.getExecution().getDemux().getDefault() :
+ session.getExecution().getDemux().get(q);
}
void Dispatcher::start()
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Fri Dec 7 11:13:09 2007
@@ -167,6 +167,8 @@
out(frame);
}
+SequenceNumber ExecutionHandler::lastSent() const { return outgoingCounter; }
+
SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener listener)
{
Mutex::ScopedLock l(lock);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Fri Dec 7 11:13:09 2007
@@ -80,6 +80,7 @@
framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener());
framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content,
ResultListener=ResultListener());
+ framing::SequenceNumber lastSent() const;
void sendSyncRequest();
void sendFlushRequest();
void completed(const framing::SequenceNumber& id, bool cumulative, bool send);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h Fri Dec 7 11:13:09 2007
@@ -65,12 +65,12 @@
void acknowledge(Session_0_10& session, bool cumulative = true, bool send = true) const
{
- session.execution().completed(id, cumulative, send);
+ session.getExecution().completed(id, cumulative, send);
}
void acknowledge(bool cumulative = true, bool send = true) const
{
- const_cast<Session_0_10&>(session).execution().completed(id, cumulative, send);
+ const_cast<Session_0_10&>(session).getExecution().completed(id, cumulative, send);
}
/**@internal for incoming messages */
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp?rev=602182&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp Fri Dec 7 11:13:09 2007
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionBase.h"
+
+namespace qpid {
+namespace client {
+using namespace framing;
+
+SessionBase::SessionBase() {}
+SessionBase::~SessionBase() {}
+SessionBase::SessionBase(shared_ptr<SessionCore> core) : impl(core) {}
+void SessionBase::suspend() { impl->suspend(); }
+void SessionBase::close() { impl->close(); }
+void SessionBase::setSynchronous(bool isSync) { impl->setSync(isSync); }
+bool SessionBase::isSynchronous() const { return impl->isSync(); }
+Execution& SessionBase::getExecution() { return impl->getExecution(); }
+Uuid SessionBase::getId() const { return impl->getId(); }
+framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
+}} // namespace qpid::client
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h?rev=602182&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h Fri Dec 7 11:13:09 2007
@@ -0,0 +1,101 @@
+#ifndef QPID_CLIENT_SESSIONBASE_H
+#define QPID_CLIENT_SESSIONBASE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/amqp_structs.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/MethodContent.h"
+#include "qpid/framing/TransferContent.h"
+#include "qpid/client/Completion.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/Response.h"
+#include "qpid/client/SessionCore.h"
+#include "qpid/client/TypedResult.h"
+#include "qpid/shared_ptr.h"
+#include <string>
+
+namespace qpid {
+namespace client {
+
+using std::string;
+using framing::Content;
+using framing::FieldTable;
+using framing::MethodContent;
+using framing::SequenceNumberSet;
+using framing::Uuid;
+
+/**
+ * Basic session operations that are not derived from AMQP XML methods.
+ */
+class SessionBase
+{
+ public:
+ SessionBase();
+ ~SessionBase();
+
+ /** Get the next message frame-set from the session. */
+ framing::FrameSet::shared_ptr get();
+
+ /** Get the session ID */
+ Uuid getId() const;
+
+ /**
+ * In synchronous mode, the session sets the sync bit on every
+ * command and waits for the broker's response before returning.
+ * Note this gives lower throughput than non-synchronous mode.
+ *
+ * In non-synchronous mode commands are sent without waiting
+ * for a respose (you can use the returned Completion object
+ * to wait for completion.)
+ *
+ *@param if true set the session to synchronous mode, else
+ * set it to non-synchronous mode.
+ */
+ void setSynchronous(bool isSync);
+
+ bool isSynchronous() const;
+
+ /**
+ * Suspend the session, can be resumed on a different connection.
+ * @see Connection::resume()
+ */
+ void suspend();
+
+ /** Close the session */
+ void close();
+
+ Execution& getExecution();
+
+ typedef framing::TransferContent DefaultContent;
+
+ protected:
+ shared_ptr<SessionCore> impl;
+ framing::ProtocolVersion version;
+ friend class Connection;
+ SessionBase(shared_ptr<SessionCore>);
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SESSIONBASE_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Fri Dec 7 11:13:09 2007
@@ -203,8 +203,10 @@
// user thread
{
Lock l(state);
- if (state==OPEN)
- doSuspend(REPLY_SUCCESS, OK);
+ if (state==SUSPENDED) { // Clear error that caused suspend
+ code=REPLY_SUCCESS;
+ text=OK;
+ }
check(state==SUSPENDED, COMMAND_INVALID, CANNOT_RESUME_SESSION);
SequenceNumber sendAck=session->resuming();
attaching(c);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Fri Dec 7 11:13:09 2007
@@ -38,29 +38,30 @@
confirmMode(true), acquireMode(false)
{}
-void SubscriptionManager::subscribeInternal(
+Completion SubscriptionManager::subscribeInternal(
const std::string& q, const std::string& dest)
{
- session.messageSubscribe(arg::queue=q, arg::destination=dest,
+ Completion c = session.messageSubscribe(arg::queue=q, arg::destination=dest,
arg::confirmMode=confirmMode, arg::acquireMode=acquireMode);
setFlowControl(dest, messages, bytes, window);
+ return c;
}
-void SubscriptionManager::subscribe(
+Completion SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const std::string& d)
{
std::string dest=d.empty() ? q:d;
dispatcher.listen(dest, &listener, autoAck);
- subscribeInternal(q, dest);
+ return subscribeInternal(q, dest);
}
-void SubscriptionManager::subscribe(
+Completion SubscriptionManager::subscribe(
LocalQueue& lq, const std::string& q, const std::string& d)
{
std::string dest=d.empty() ? q:d;
lq.session=session;
- lq.queue=session.execution().getDemux().add(dest, ByTransferDest(dest));
- subscribeInternal(q, dest);
+ lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest));
+ return subscribeInternal(q, dest);
}
void SubscriptionManager::setFlowControl(
@@ -91,7 +92,9 @@
session.messageCancel(dest);
}
-void SubscriptionManager::run(bool autoStop)
+void SubscriptionManager::setAutoStop(bool set) { autoStop=set; }
+
+void SubscriptionManager::run()
{
dispatcher.setAutoStop(autoStop);
dispatcher.run();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Fri Dec 7 11:13:09 2007
@@ -23,21 +23,24 @@
*/
#include "qpid/sys/Mutex.h"
#include <qpid/client/Dispatcher.h>
+#include <qpid/client/Completion.h>
#include <qpid/client/Session_0_10.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/LocalQueue.h>
+#include <qpid/sys/Runnable.h>
+
#include <set>
#include <sstream>
namespace qpid {
namespace client {
-class SubscriptionManager
+class SubscriptionManager : public sys::Runnable
{
typedef sys::Mutex::ScopedLock Lock;
typedef sys::Mutex::ScopedUnlock Unlock;
- void subscribeInternal(const std::string& q, const std::string& dest);
+ Completion subscribeInternal(const std::string& q, const std::string& dest);
qpid::client::Dispatcher dispatcher;
qpid::client::Session_0_10& session;
@@ -47,8 +50,9 @@
AckPolicy autoAck;
bool confirmMode;
bool acquireMode;
-
-public:
+ bool autoStop;
+
+ public:
SubscriptionManager(Session_0_10& session);
/**
@@ -59,9 +63,9 @@
*@param tag Unique destination tag for the listener.
* If not specified, the queue name is used.
*/
- void subscribe(MessageListener& listener,
- const std::string& queue,
- const std::string& tag=std::string());
+ Completion subscribe(MessageListener& listener,
+ const std::string& queue,
+ const std::string& tag=std::string());
/**
* Subscribe a LocalQueue to receive messages from queue.
@@ -70,17 +74,21 @@
*@param tag Unique destination tag for the listener.
* If not specified, the queue name is used.
*/
- void subscribe(LocalQueue& localQueue,
+ Completion subscribe(LocalQueue& localQueue,
const std::string& queue,
const std::string& tag=std::string());
/** Cancel a subscription. */
void cancel(const std::string tag);
- /** Deliver messages until stop() is called.
- *@param autoStop If true, return when all listeners are cancelled.
+ /** Deliver messages until stop() is called. */
+ void run();
+
+ /** If set true, run() will stop when all subscriptions
+ * are cancelled. If false, run will only stop when stop()
+ * is called. True by default.
*/
- void run(bool autoStop=true);
+ void setAutoStop(bool set=true);
/** Cause run() to return */
void stop();
Added: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=602182&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Fri Dec 7 11:13:09 2007
@@ -0,0 +1,101 @@
+#ifndef TESTS_BROKERFIXTURE_H
+#define TESTS_BROKERFIXTURE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Thread.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/Session_0_10.h"
+#include "qpid/client/SubscriptionManager.h"
+
+namespace qpid { namespace client {
+/** Back door into private Connector stuff */
+struct TestConnector {
+ static void disconnect(qpid::client::Connector& c) {
+ c.socket.close();
+ c.handleClosed();
+ }
+};
+}}
+
+/**
+ * A fixture to create an in-process broker and connect to it for tests.
+ */
+struct BrokerFixture {
+ typedef qpid::broker::Broker Broker;
+ typedef boost::shared_ptr<Broker> BrokerPtr;
+
+ struct OpenConnection : public qpid::client::Connection {
+ OpenConnection(int port) { open("localhost", port); }
+ };
+
+ BrokerPtr broker;
+ qpid::sys::Thread brokerThread;
+ OpenConnection connection;
+ qpid::client::Session_0_10 session;
+ qpid::client::SubscriptionManager subs;
+ qpid::client::LocalQueue lq;
+
+ BrokerPtr newBroker() {
+ Broker::Options opts;
+ opts.port=0;
+ opts.workerThreads=1;
+ BrokerPtr b=Broker::create(opts);
+ // TODO aconway 2007-12-05: Without the following line
+ // the test can hang in the connection ctor. This is
+ // a race condition that should be fixed.
+ b->getPort();
+ return b;
+ };
+
+ BrokerFixture() : broker(newBroker()),
+ brokerThread(*broker),
+ connection(broker->getPort()),
+ session(connection.newSession()),
+ subs(session)
+ {}
+
+ ~BrokerFixture() {
+ connection.close();
+ broker->shutdown();
+ brokerThread.join();
+ }
+
+ /** Open a connection to the local broker */
+ void open(qpid::client::Connection& c) {
+ c.open("localhost", broker->getPort());
+ }
+
+ /** Close a connection's socket */
+ static void disconnect(qpid::client::Connection& c) {
+ struct Expose : public qpid::client::Connection {
+ void disconnect() {
+ qpid::client::TestConnector::disconnect(*impl->getConnector());
+ }
+ };
+ static_cast<Expose&>(c).disconnect();
+ }
+};
+
+#endif /*!TESTS_BROKERFIXTURE_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp Fri Dec 7 11:13:09 2007
@@ -20,7 +20,7 @@
*/
#include <vector>
#include "qpid_test_plugin.h"
-#include "InProcessBroker.h"
+#include "BrokerFixture.h"
#include "qpid/client/Channel.h"
#include "qpid/client/Message.h"
#include "qpid/client/Queue.h"
@@ -44,7 +44,7 @@
* The test base defines the tests methods, derived classes
* instantiate the channel in Basic or Message mode.
*/
-class ChannelTestBase : public CppUnit::TestCase
+class ChannelTestBase : public CppUnit::TestCase, public BrokerFixture
{
struct Listener: public qpid::client::MessageListener {
vector<Message> messages;
@@ -56,7 +56,6 @@
}
};
- qpid::InProcessBrokerClient connection;
const std::string qname;
const std::string data;
Queue queue;
@@ -69,8 +68,7 @@
public:
ChannelTestBase()
- : connection(FRAME_MAX),
- qname("testq"), data("hello"),
+ : qname("testq"), data("hello"),
queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE)
{}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Fri Dec 7 11:13:09 2007
@@ -19,7 +19,7 @@
*
*/
#include "qpid_test_plugin.h"
-#include "InProcessBroker.h"
+#include "BrokerFixture.h"
#include "qpid/client/Dispatcher.h"
#include "qpid/client/Session_0_10.h"
#include "qpid/framing/TransferContent.h"
@@ -61,7 +61,7 @@
}
};
-class ClientSessionTest : public CppUnit::TestCase
+class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture
{
CPPUNIT_TEST_SUITE(ClientSessionTest);
CPPUNIT_TEST(testQueueQuery);
@@ -74,23 +74,14 @@
CPPUNIT_TEST_SUITE_END();
shared_ptr<broker::Broker> broker;
- Session_0_10 session;
- // Defer construction & thread creation to setUp
- boost::optional<InProcessConnection> c;
- boost::optional<InProcessConnection> c2;
-public:
+ public:
void setUp() {
broker = broker::Broker::create();
- c=boost::in_place<InProcessConnection>(broker);
- c2=boost::in_place<InProcessConnection>(broker);
}
void tearDown() {
- c2.reset();
- c.reset();
- broker.reset();
}
void declareSubscribe(const std::string& q="my-queue",
@@ -109,7 +100,7 @@
void testQueueQuery()
{
- session = c->newSession();
+ session =connection.newSession();
session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true);
TypedResult<QueueQueryResult> result = session.queueQuery(std::string("my-queue"));
CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable());
@@ -120,7 +111,7 @@
void testTransfer()
{
- session = c->newSession();
+ session =connection.newSession();
declareSubscribe();
session.messageTransfer(content=TransferContent("my-message", "my-queue"));
//get & test the message:
@@ -128,12 +119,12 @@
CPPUNIT_ASSERT(msg->isA<MessageTransferBody>());
CPPUNIT_ASSERT_EQUAL(std::string("my-message"), msg->getContent());
//confirm receipt:
- session.execution().completed(msg->getId(), true, true);
+ session.getExecution().completed(msg->getId(), true, true);
}
void testDispatcher()
{
- session = c->newSession();
+ session =connection.newSession();
declareSubscribe();
TransferContent msg1("One");
@@ -161,16 +152,16 @@
}
void testResumeExpiredError() {
- session = c->newSession(0);
+ session =connection.newSession(0);
session.suspend(); // session has 0 timeout.
try {
- c->resume(session);
+ connection.resume(session);
CPPUNIT_FAIL("Expected InvalidArgumentException.");
} catch(const InternalErrorException&) {}
}
void testUseSuspendedError() {
- session = c->newSession(60);
+ session =connection.newSession(60);
session.suspend();
try {
session.exchangeQuery(name="amq.fanout");
@@ -179,26 +170,27 @@
}
void testSuspendResume() {
- session = c->newSession(60);
+ session =connection.newSession(60);
declareSubscribe();
session.suspend();
// Make sure we are still subscribed after resume.
- c->resume(session);
+ connection.resume(session);
session.messageTransfer(content=TransferContent("my-message", "my-queue"));
FrameSet::shared_ptr msg = session.get();
CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent());
}
void testDisconnectResume() {
- session = c->newSession(60);
+ session =connection.newSession(60);
session.queueDeclare(queue="before");
CPPUNIT_ASSERT(queueExists("before"));
- // Simulate lost frames.
- c->discard();
session.queueDeclare(queue=string("after"));
- c->disconnect(); // Simulate disconnect, resume on a new connection.
- c2->resume(session);
+ disconnect(connection);
+ Connection c2;
+ open(c2);
+ c2.resume(session);
CPPUNIT_ASSERT(queueExists("after"));
+ c2.close();
}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Fri Dec 7 11:13:09 2007
@@ -20,7 +20,7 @@
*/
#include "unit_test.h"
-#include "InProcessBroker.h"
+#include "BrokerFixture.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
@@ -31,74 +31,74 @@
using namespace std;
using namespace qpid;
+using namespace sys;
using namespace client;
using namespace framing;
-struct Fixture {
- InProcessConnection connection;
- InProcessConnection connection2;
- Session_0_10 session;
- SubscriptionManager sub;
- LocalQueue q;
-
- Fixture() : connection(),
- connection2(connection.getBroker()),
- session(connection.newSession()),
- sub(session)
- {
- session.queueDeclare(arg::queue="q");
+using boost::bind;
+using boost::function;
+
+template <class Ex>
+struct Catcher : public Runnable {
+ function<void ()> f;
+ bool caught;
+ Thread thread;
+
+ Catcher(function<void ()> f_) : f(f_), caught(false), thread(this) {}
+ ~Catcher() { join(); }
+
+ void run() {
+ try { f(); }
+ catch(const Ex& e) {
+ caught=true;
+ BOOST_MESSAGE(e.what());
+ }
+ catch(const std::exception& e) {
+ BOOST_ERROR(string("Bad exception: ")+e.what());
+ }
+ catch(...) {
+ BOOST_ERROR(string("Bad exception: unknown"));
+ }
}
-};
+ bool join() {
+ if (thread.id()) {
+ thread.join();
+ thread=Thread();
+ }
+ return caught;
+ }
+};
-// TODO aconway 2007-11-30: need InProcessBroker to be a more accurate
-// simulation of shutdown behaviour. It should override only
-// Connector.run() to substitute NetworkQueues for the Dispatcher.
-//
-// template <class Ex>
-// struct Catcher : public sys::Runnable {
-// Session_0_10 s;
-// boost::function<void ()> f;
-// bool caught;
-// Catcher(Session_0_10 s_, boost::function<void ()> f_)
-// : s(s_), f(f_), caught(false) {}
-// void run() {
-// try { f(); } catch(const Ex& e) {
-// caught=true;
-// BOOST_MESSAGE(e.what());
-// }
-// }
-// };
-
-// BOOST_FIXTURE_TEST_CASE(DisconnectedGet, Fixture) {
-// Catcher<Exception> get(session, boost::bind(&Session_0_10::get, session));
-// sub.subscribe(q, "q");
-// sys::Thread t(get);
-// connection.disconnect();
-// t.join();
-// BOOST_CHECK(get.caught);
-// }
+BOOST_FIXTURE_TEST_CASE(DisconnectedGet, BrokerFixture) {
+ Catcher<ClosedException> get(bind(&Session_0_10::get, session));
+ disconnect(connection);
+ BOOST_CHECK(get.join());
+}
+
+BOOST_FIXTURE_TEST_CASE(DisconnectedPop, BrokerFixture) {
+ session.queueDeclare(arg::queue="q");
+ subs.subscribe(lq, "q");
+ Catcher<ClosedException> pop(bind(&LocalQueue::pop, boost::ref(lq)));
+ disconnect(connection);
+ BOOST_CHECK(pop.join());
+}
-// BOOST_FIXTURE_TEST_CASE(DisconnectedListen, Fixture) {
+// FIXME aconway 2007-12-07: This test hangs sporadically at t.join
+// BOOST_FIXTURE_TEST_CASE(DisconnectedListen, BrokerFixture) {
// struct NullListener : public MessageListener {
// void received(Message&) { BOOST_FAIL("Unexpected message"); }
// } l;
-// sub.subscribe(l, "q");
-// connection.disconnect();
-// try {
-// sub.run();
-// BOOST_FAIL("Expected exception");
-// } catch (const Exception&e) { BOOST_FAIL(e.what()); }
-// try {
-// session.queueDeclare(arg::queue="foo");
-// BOOST_FAIL("Expected exception");
-// } catch (const Exception&e) { BOOST_FAIL(e.what()); }
+// session.queueDeclare(arg::queue="q");
+// subs.subscribe(l, "q");
+// Thread t(subs);
+// disconnect(connection);
+// t.join();
+// BOOST_CHECK_THROW(session.close(), InternalErrorException);
// }
-// TODO aconway 2007-11-30: setSynchronous appears not to work.
-// BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, Fixture) {
-// session.setSynchronous(true);
-// BOOST_CHECK_THROW(sub.subscribe(q, "no such queue"), NotFoundException);
-// }
+BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, BrokerFixture) {
+ BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), NotFoundException);
+}
QPID_AUTO_TEST_SUITE_END()
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=602182&r1=602181&r2=602182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Fri Dec 7 11:13:09 2007
@@ -205,7 +205,7 @@
}
}
// Make sure this is all completed before we return.
- session.execution().sendSyncRequest();
+ session.getExecution().sendSyncRequest();
}
};
@@ -231,13 +231,9 @@
// Functor to collect rates.
void operator()(const string& data) {
- try {
- double d=lexical_cast<double>(data);
- values.push_back(d);
- sum += d;
- } catch (...) {
- throw Exception(QPID_MSG("Bad data, expecting double: " << data));
- }
+ double d=lexical_cast<double>(data);
+ values.push_back(d);
+ sum += d;
}
double mean() const {