You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/01/29 17:13:27 UTC
svn commit: r501087 [1/2] - in /incubator/qpid/branches/qpid.0-9: cpp/
cpp/gen/ cpp/lib/broker/ cpp/lib/client/ cpp/lib/common/framing/
cpp/lib/common/sys/apr/ cpp/tests/ gentools/src/org/apache/qpid/gentools/
gentools/templ.cpp/
Author: aconway
Date: Mon Jan 29 08:13:24 2007
New Revision: 501087
URL: http://svn.apache.org/viewvc?view=rev&rev=501087
Log:
* Added ClientAdapter - client side ChannelAdapter. Updated client side.
* Moved ChannelAdapter initialization from ctor to init(), updated broker side.
* Improved various exception messages with boost::format messages.
* Removed unnecssary virtual inheritance.
* Widespread: fixed incorrect non-const ProtocolVersion& parameters.
* Client API: pass channels by reference, not pointer.
* codegen:
- MethodBodyClass.h.templ: Added CLASS_ID, METHOD_ID and isA() template.
- Various: fixed non-const ProtocolVersion& parameters.
* cpp/bootstrap: Allow config arguments with -build.
* cpp/gen/Makefile.am: Merged codegen fixes from trunk.
Added:
incubator/qpid/branches/qpid.0-9/cpp/gen/make-gen-src-mk.sh
- copied unchanged from r501057, incubator/qpid/trunk/qpid/cpp/gen/make-gen-src-mk.sh
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.cpp (with props)
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.h (with props)
Modified:
incubator/qpid/branches/qpid.0-9/cpp/bootstrap
incubator/qpid/branches/qpid.0-9/cpp/gen/Makefile.am
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQMethodBody.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolInitiation.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/amqp_types.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/apr/APRSocket.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/apr/APRSocket.h
incubator/qpid/branches/qpid.0-9/cpp/tests/TxBufferTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/client_test.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/echo_service.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/topic_listener.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/topic_publisher.cpp
incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/CppGenerator.java
incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ClientOperations.h.tmpl
incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ClientProxy.h.tmpl
incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl
incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/AMQP_ServerProxy.h.tmpl
incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/MethodBodyClass.h.tmpl
Modified: incubator/qpid/branches/qpid.0-9/cpp/bootstrap
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/bootstrap?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/bootstrap (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/bootstrap Mon Jan 29 08:13:24 2007
@@ -31,7 +31,8 @@
autoconf
if [ "$1" = "-build" ] ; then
- ./configure
+ shift
+ ./configure "$@"
make
make check
fi
Modified: incubator/qpid/branches/qpid.0-9/cpp/gen/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/gen/Makefile.am?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/gen/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/gen/Makefile.am Mon Jan 29 08:13:24 2007
@@ -6,7 +6,7 @@
# Distribute the generated sources, at least for now, since
# the generator code is in java.
EXTRA_DIST = $(BUILT_SOURCES)
-MAINTAINERCLEANFILES = $(BUILT_SOURCES)
+DISTCLEANFILES = $(BUILT_SOURCES) timestamp gen-src.mk
# Don't attempt to run the code generator unless configure has set
# CAN_GENERATE_CODE, indicating that the amqp.xml and tools needed
@@ -31,21 +31,7 @@
-c -o . -t $(gentools_dir)/templ.cpp $(spec)
touch timestamp
-DISTCLEANFILES = gen-src.mk
gen-src.mk: timestamp
- ( echo 'generated_sources = '\\ \
- && ls *.cpp | sort -u | sed 's/.*/ & \\/;$$s/ \\//'; \
- echo 'generated_headers = '\\ \
- && ls *.h | sort -u | sed 's/.*/ & \\/;$$s/ \\//'; \
- ) > $@-t
- ( echo if CAN_GENERATE_CODE; \
- echo 'java_sources = '\\ \
- && find $(gentools_srcdir) -name '*.java' \
- | sort -u | sed 's/.*/ & \\/;$$s/ \\//'; \
- echo 'cxx_templates = '\\ \
- && find $(gentools_dir)/templ.cpp -name '*.tmpl' \
- | sort -u | sed 's/.*/ & \\/;$$s/ \\//'; \
- echo endif \
- ) >> $@-t
+ ./make-gen-src-mk.sh $(gentools_dir) $(gentools_srcdir) > $@-t
mv $@-t $@
endif
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Mon Jan 29 08:13:24 2007
@@ -212,22 +212,21 @@
void BrokerAdapter::ServerOps::ChannelHandlerImpl::open(
const MethodContext& context, const string& /*outOfBand*/){
- // FIXME aconway 2007-01-17: Assertions on all channel methods,
- assertChannelNonZero(channel.getId());
- if (channel.isOpen())
- throw ConnectionException(504, "Channel already open");
channel.open();
- // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
+ // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
connection.client->getChannel().openOk(context, std::string()/* ID */);
}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/,
- u_int16_t /*classId*/, u_int16_t /*methodId*/){
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(
+ const MethodContext& context, u_int16_t /*replyCode*/,
+ const string& /*replyText*/,
+ u_int16_t /*classId*/, u_int16_t /*methodId*/)
+{
connection.client->getChannel().closeOk(context);
- // FIXME aconway 2007-01-18: Following line destroys this. Ugly.
+ // FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
connection.closeChannel(channel.getId());
}
@@ -499,13 +498,12 @@
BrokerAdapter::BrokerAdapter(
Channel* ch, Connection& c, Broker& b
) :
- ChannelAdapter(c.getOutput(), ch->getId()),
channel(ch),
connection(c),
broker(b),
serverOps(new ServerOps(*ch,c,b))
{
- assert(ch);
+ init(ch->getId(), c.getOutput(), ch->getVersion());
}
void BrokerAdapter::handleMethodInContext(
@@ -544,6 +542,9 @@
}
+bool BrokerAdapter::isOpen() const {
+ return channel->isOpen();
+}
}} // namespace qpid::broker
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.h Mon Jan 29 08:13:24 2007
@@ -52,6 +52,8 @@
void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
+ bool isOpen() const;
+
private:
void handleMethodInContext(
boost::shared_ptr<qpid::framing::AMQMethodBody> method,
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Mon Jan 29 08:13:24 2007
@@ -108,6 +108,7 @@
MessageStore* const _store = 0, u_int64_t stagingThreshold = 0);
~Channel();
bool isOpen() const { return opened; }
+ const framing::ProtocolVersion& getVersion() const { return version; }
void open() { opened = true; }
u_int16_t getId() const { return id; }
void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Connection.cpp Mon Jan 29 08:13:24 2007
@@ -77,6 +77,7 @@
MethodContext(0, &getAdapter(0)),
header->getMajor(), header->getMinor(),
properties, mechanisms, locales);
+ getAdapter(0).init(0, *out, client->getProtocolVersion());
}
void Connection::idleOut(){}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h Mon Jan 29 08:13:24 2007
@@ -32,7 +32,7 @@
public:
virtual void add(qpid::framing::AMQContentBody::shared_ptr data) = 0;
virtual u_int32_t size() = 0;
- virtual void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0;
+ virtual void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0;
virtual void encode(qpid::framing::Buffer& buffer) = 0;
virtual void destroy() = 0;
virtual ~Content(){}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.cpp Mon Jan 29 08:13:24 2007
@@ -39,7 +39,7 @@
return sum;
}
-void InMemoryContent::send(qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
+void InMemoryContent::send(const qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
{
for (content_iterator i = content.begin(); i != content.end(); i++) {
if ((*i)->size() > framesize) {
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/InMemoryContent.h Mon Jan 29 08:13:24 2007
@@ -35,7 +35,7 @@
public:
void add(qpid::framing::AMQContentBody::shared_ptr data);
u_int32_t size();
- void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+ void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
void encode(qpid::framing::Buffer& buffer);
void destroy();
~InMemoryContent(){}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.cpp Mon Jan 29 08:13:24 2007
@@ -37,7 +37,7 @@
return 0;//all content is written as soon as it is added
}
-void LazyLoadedContent::send(qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
+void LazyLoadedContent::send(const qpid::framing::ProtocolVersion& version, OutputHandler* out, int channel, u_int32_t framesize)
{
if (expectedSize > framesize) {
for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) {
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/LazyLoadedContent.h Mon Jan 29 08:13:24 2007
@@ -34,7 +34,7 @@
LazyLoadedContent(MessageStore* const store, Message* const msg, u_int64_t expectedSize);
void add(qpid::framing::AMQContentBody::shared_ptr data);
u_int32_t size();
- void send(qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+ void send(const qpid::framing::ProtocolVersion& version, qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
void encode(qpid::framing::Buffer& buffer);
void destroy();
~LazyLoadedContent(){}
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.cpp?view=auto&rev=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.cpp Mon Jan 29 08:13:24 2007
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "AMQP_ClientOperations.h"
+#include "ClientAdapter.h"
+#include "Connection.h"
+#include "Exception.h"
+#include "AMQMethodBody.h"
+
+namespace qpid {
+namespace client {
+
+using namespace qpid;
+using namespace qpid::framing;
+
+typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+void ClientAdapter::handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const MethodContext& context
+)
+{
+ try{
+ method->invoke(*clientOps, context);
+ }catch(ChannelException& e){
+ connection.client->getChannel().close(
+ context, e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ connection.closeChannel(getId());
+ }catch(ConnectionException& e){
+ connection.client->getConnection().close(
+ context, e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ connection.client->getConnection().close(
+ context, 541/*internal error*/, e.what(),
+ method->amqpClassId(), method->amqpMethodId());
+ }
+}
+
+void ClientAdapter::handleHeader(AMQHeaderBody::shared_ptr body) {
+ channel->handleHeader(body);
+}
+
+void ClientAdapter::handleContent(AMQContentBody::shared_ptr body) {
+ channel->handleContent(body);
+}
+
+void ClientAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
+ // TODO aconway 2007-01-17: Implement heartbeats.
+}
+
+
+
+}} // namespace qpid::client
+
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.h?view=auto&rev=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.h Mon Jan 29 08:13:24 2007
@@ -0,0 +1,66 @@
+#ifndef _client_ClientAdapter_h
+#define _client_ClientAdapter_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "ChannelAdapter.h"
+#include "ClientChannel.h"
+
+namespace qpid {
+namespace client {
+
+class AMQMethodBody;
+class Connection;
+
+/**
+ * Per-channel protocol adapter.
+ *
+ * Translates protocol bodies into calls on the core Channel,
+ * Connection and Client objects.
+ *
+ * Owns a channel, has references to Connection and Client.
+ */
+class ClientAdapter : public framing::ChannelAdapter
+{
+ public:
+ ClientAdapter(std::auto_ptr<Channel> ch, Connection&, Client&);
+ Channel& getChannel() { return *channel; }
+
+ void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
+ void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
+ void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
+
+ private:
+ void handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const framing::MethodContext& context);
+
+ class ClientOps;
+
+ std::auto_ptr<Channel> channel;
+ Connection& connection;
+ Client& client;
+ boost::shared_ptr<ClientOps> clientOps;
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!_client_ClientAdapter_h*/
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientAdapter.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp Mon Jan 29 08:13:24 2007
@@ -23,42 +23,115 @@
#include <ClientMessage.h>
#include <QpidError.h>
#include <MethodBodyInstances.h>
+#include "Connection.h"
+
+// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
+// handling of errors that should close the connection or the channel.
+// Make sure the user thread receives a connection in each case.
+//
using namespace boost; //to use dynamic_pointer_cast
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
+const std::string Channel::OK("OK");
+
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
- id(0),
- con(0),
- out(0),
+ connection(0),
incoming(0),
- closed(true),
prefetch(_prefetch),
- transactional(_transactional),
-// AMQP version management change - kpvdr 2006-11-20
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
- version(8, 0)
+ transactional(_transactional)
{ }
Channel::~Channel(){
- stop();
+ close();
+}
+
+void Channel::open(ChannelId id, Connection& con)
+{
+ if (isOpen())
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id);
+ connection = &con;
+ init(id, con, con.getVersion()); // ChannelAdapter initialization.
+ string oob;
+ if (id != 0)
+ sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob));
+}
+
+void Channel::protocolInit(
+ const std::string& uid, const std::string& pwd, const std::string& vhost) {
+ assert(connection);
+ responses.expect();
+ connection->connector->init(); // Send ProtocolInit block.
+ responses.receive<ConnectionStartBody>();
+
+ FieldTable props;
+ string mechanism("PLAIN");
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ string locale("en_US");
+ // TODO aconway 2007-01-26: Move client over to proxy model,
+ // symmetric with server.
+ ConnectionTuneBody::shared_ptr proposal =
+ sendAndReceive<ConnectionTuneBody>(
+ new ConnectionStartOkBody(
+ version, props, mechanism, response, locale));
+
+ /**
+ * Assume for now that further challenges will not be required
+ //receive connection.secure
+ responses.receive(connection_secure));
+ //send connection.secure-ok
+ connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
+ **/
+
+ connection->send(
+ new AMQFrame(
+ version, 0,
+ new ConnectionTuneOkBody(
+ version, proposal->getChannelMax(),
+ connection->getMaxFrameSize(),
+ proposal->getHeartbeat())));
+
+ u_int16_t heartbeat = proposal->getHeartbeat();
+ connection->connector->setReadTimeout(heartbeat * 2);
+ connection->connector->setWriteTimeout(heartbeat);
+
+ // Send connection open.
+ std::string capabilities;
+ responses.expect();
+ send(new AMQFrame(
+ version, 0,
+ new ConnectionOpenBody(version, vhost, capabilities, true)));
+ //receive connection.open-ok (or redirect, but ignore that for now
+ //esp. as using force=true).
+ responses.waitForResponse();
+ if(responses.validate<ConnectionOpenOkBody>()) {
+ //ok
+ }else if(responses.validate<ConnectionRedirectBody>()){
+ //ignore for now
+ ConnectionRedirectBody::shared_ptr redirect(
+ shared_polymorphic_downcast<ConnectionRedirectBody>(
+ responses.getResponse()));
+ std::cout << "Received redirection to " << redirect->getHost()
+ << std::endl;
+ } else {
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
+ }
}
+
+bool Channel::isOpen() const { return connection; }
void Channel::setPrefetch(u_int16_t _prefetch){
prefetch = _prefetch;
- if(con != 0 && out != 0){
- setQos();
- }
+ setQos();
}
void Channel::setQos(){
-// AMQP version management change - kpvdr 2006-11-20
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
- sendAndReceive(new AMQFrame(version, id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok);
+ sendAndReceive<BasicQosOkBody>(
+ new BasicQosBody(version, 0, prefetch, false));
if(transactional){
- sendAndReceive(new AMQFrame(version, id, new TxSelectBody(version)), method_bodies.tx_select_ok);
+ sendAndReceive<TxSelectOkBody>(new TxSelectBody(version));
}
}
@@ -66,62 +139,51 @@
string name = exchange.getName();
string type = exchange.getType();
FieldTable args;
- AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args));
- if(synch){
- sendAndReceive(frame, method_bodies.exchange_declare_ok);
- }else{
- out->send(frame);
- }
+ sendAndReceiveSync<ExchangeDeclareOkBody>(
+ synch,
+ new ExchangeDeclareBody(
+ version, 0, name, type, false, false, false, false, !synch, args));
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
- AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeleteBody(version, 0, name, false, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.exchange_delete_ok);
- }else{
- out->send(frame);
- }
+ sendAndReceiveSync<ExchangeDeleteOkBody>(
+ synch,
+ new ExchangeDeleteBody(version, 0, name, false, !synch));
}
void Channel::declareQueue(Queue& queue, bool synch){
string name = queue.getName();
FieldTable args;
- AMQFrame* frame = new AMQFrame(version, id, new QueueDeclareBody(version, 0, name, false, false,
- queue.isExclusive(),
- queue.isAutoDelete(), !synch, args));
- if(synch){
- sendAndReceive(frame, method_bodies.queue_declare_ok);
+ sendAndReceiveSync<QueueDeclareOkBody>(
+ synch,
+ new QueueDeclareBody(
+ version, 0, name, false, false,
+ queue.isExclusive(), queue.isAutoDelete(), !synch, args));
+ if (synch) {
if(queue.getName().length() == 0){
QueueDeclareOkBody::shared_ptr response =
- dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse());
+ shared_polymorphic_downcast<QueueDeclareOkBody>(
+ responses.getResponse());
queue.setName(response->getQueue());
}
- }else{
- out->send(frame);
}
}
void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
//ticket, queue, ifunused, ifempty, nowait
string name = queue.getName();
- AMQFrame* frame = new AMQFrame(version, id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.queue_delete_ok);
- }else{
- out->send(frame);
- }
+ sendAndReceiveSync<QueueDeleteOkBody>(
+ synch,
+ new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
}
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- AMQFrame* frame = new AMQFrame(version, id, new QueueBindBody(version, 0, q, e, key,!synch, args));
- if(synch){
- sendAndReceive(frame, method_bodies.queue_bind_ok);
- }else{
- out->send(frame);
- }
+ sendAndReceiveSync<QueueBindOkBody>(
+ synch,
+ new QueueBindBody(version, 0, q, e, key,!synch, args));
}
void Channel::consume(
@@ -129,52 +191,48 @@
int ackMode, bool noLocal, bool synch, const FieldTable* fields)
{
string q = queue.getName();
- AMQFrame* frame =
- new AMQFrame(version,
- id,
- new BasicConsumeBody(
- version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable()));
- if(synch){
- sendAndReceive(frame, method_bodies.basic_consume_ok);
- BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse());
+ sendAndReceiveSync<BasicConsumeOkBody>(
+ synch,
+ new BasicConsumeBody(
+ version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable()));
+ if (synch) {
+ BasicConsumeOkBody::shared_ptr response =
+ shared_polymorphic_downcast<BasicConsumeOkBody>(
+ responses.getResponse());
tag = response->getConsumerTag();
- }else{
- out->send(frame);
}
- Consumer* c = new Consumer();
- c->listener = listener;
- c->ackMode = ackMode;
- c->lastDeliveryTag = 0;
- consumers[tag] = c;
+ Consumer& c = consumers[tag];
+ c.listener = listener;
+ c.ackMode = ackMode;
+ c.lastDeliveryTag = 0;
}
-void Channel::cancel(std::string& tag, bool synch){
- Consumer* c = consumers[tag];
- if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true)));
- }
-
- AMQFrame* frame = new AMQFrame(version, id, new BasicCancelBody(version, (string&) tag, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.basic_cancel_ok);
- }else{
- out->send(frame);
- }
- consumers.erase(tag);
- if(c != 0){
- delete c;
+void Channel::cancel(const std::string& tag, bool synch) {
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i != consumers.end()) {
+ Consumer& c = i->second;
+ if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+ send(new BasicAckBody(version, c.lastDeliveryTag, true));
+ sendAndReceiveSync<BasicCancelOkBody>(
+ synch, new BasicCancelBody(version, tag, !synch));
+ consumers.erase(tag);
}
}
void Channel::cancelAll(){
- for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){
- Consumer* c = i->second;
- if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true)));
+ while(!consumers.empty()) {
+ Consumer c = consumers.begin()->second;
+ consumers.erase(consumers.begin());
+ if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
+ && c.lastDeliveryTag > 0)
+ {
+ // Let exceptions propagate, if one fails no point
+ // trying the rest. NB no memory leaks if we do,
+ // ConsumerMap holds values, not pointers.
+ //
+ send(new BasicAckBody(version, c.lastDeliveryTag, true));
}
- consumers.erase(i);
- delete c;
}
}
@@ -191,26 +249,28 @@
retrieved = 0;
}
-bool Channel::get(Message& msg, const Queue& queue, int ackMode){
+bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
string name = queue.getName();
- AMQFrame* frame = new AMQFrame(version, id, new BasicGetBody(version, 0, name, ackMode));
+ AMQBody::shared_ptr body(new BasicGetBody(version, 0, name, ackMode));
responses.expect();
- out->send(frame);
+ send(body);
responses.waitForResponse();
AMQMethodBody::shared_ptr response = responses.getResponse();
- if(method_bodies.basic_get_ok.match(response.get())){
+ if(response->isA<BasicGetOkBody>()) {
if(incoming != 0){
std::cout << "Existing message not complete" << std::endl;
+ // FIXME aconway 2007-01-26: close the connection? the channel?
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
}else{
incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
}
retrieve(msg);
return true;
- }if(method_bodies.basic_get_empty.match(response.get())){
+ }if(response->isA<BasicGetEmptyBody>()){
return false;
}else{
- THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get.");
+ // FIXME aconway 2007-01-26: must close the connection.
+ THROW_QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame");
}
}
@@ -219,25 +279,24 @@
string e = exchange.getName();
string key = routingKey;
- out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
+ send(new BasicPublishBody(version, 0, e, key, mandatory, immediate));
//break msg up into header frame and content frame(s) and send these
string data = msg.getData();
msg.header->setContentSize(data.length());
- AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header));
- out->send(new AMQFrame(version, id, body));
+ send(msg.header);
u_int64_t data_length = data.length();
if(data_length > 0){
- u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes
+ u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes
if(data_length < frag_size){
- out->send(new AMQFrame(version, id, new AMQContentBody(data)));
+ send(new AMQContentBody(data));
}else{
u_int32_t offset = 0;
u_int32_t remaining = data_length - offset;
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(data.substr(offset, length));
- out->send(new AMQFrame(version, id, new AMQContentBody(frag)));
+ send(new AMQContentBody(frag));
offset += length;
remaining = data_length - offset;
@@ -247,56 +306,48 @@
}
void Channel::commit(){
- AMQFrame* frame = new AMQFrame(version, id, new TxCommitBody(version));
- sendAndReceive(frame, method_bodies.tx_commit_ok);
+ sendAndReceive<TxCommitOkBody>(new TxCommitBody(version));
}
void Channel::rollback(){
- AMQFrame* frame = new AMQFrame(version, id, new TxRollbackBody(version));
- sendAndReceive(frame, method_bodies.tx_rollback_ok);
-}
-
-void Channel::handleRequest(AMQRequestBody::shared_ptr body) {
- // FIXME aconway 2007-01-19: request/response handling.
- handleMethod(body);
+ sendAndReceive<TxRollbackOkBody>(new TxRollbackBody(version));
}
-void Channel::handleResponse(AMQResponseBody::shared_ptr body) {
- // FIXME aconway 2007-01-19: request/response handling.
- handleMethod(body);
-}
-
-void Channel::handleMethod(AMQMethodBody::shared_ptr body){
- //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
+void Channel::handleMethodInContext(
+ AMQMethodBody::shared_ptr body, const MethodContext&)
+{
+ //channel.flow, channel.close, basic.deliver, basic.return or a
+ //response to a synchronous request
if(responses.isWaiting()){
responses.signalResponse(body);
- }else if(method_bodies.basic_deliver.match(body.get())){
+ }else if(body->isA<BasicDeliverBody>()) {
if(incoming != 0){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
}else{
incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
}
- }else if(method_bodies.basic_return.match(body.get())){
+ }else if(body->isA<BasicReturnBody>()){
if(incoming != 0){
std::cout << "Existing message not complete" << std::endl;
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
}else{
incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
}
- }else if(method_bodies.channel_close.match(body.get())){
- con->removeChannel(this);
- //need to signal application that channel has been closed through exception
-
- }else if(method_bodies.channel_flow.match(body.get())){
-
- }else{
- //signal error
- std::cout << "Unhandled method: " << *body << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method");
+ }else if(body->isA<ChannelCloseBody>()){
+ peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body));
+ }else if(body->isA<ChannelFlowBody>()){
+ // TODO aconway 2007-01-24:
+ }else if(body->isA<ConnectionCloseBody>()){
+ connection->close();
+ }else{
+ connection->close(
+ 504, "Unrecognised method",
+ body->amqpClassId(), body->amqpMethodId());
}
}
-
+
void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
if(incoming == 0){
//handle invalid frame sequence
@@ -331,27 +382,16 @@
dispatcher = Thread(this);
}
-void Channel::stop(){
- {
- Monitor::ScopedLock l(dispatchMonitor);
- closed = true;
- responses.signalResponse(AMQMethodBody::shared_ptr());
- dispatchMonitor.notify();
- }
- dispatcher.join();
-}
-
void Channel::run(){
dispatch();
}
void Channel::enqueue(){
+ Monitor::ScopedLock l(retrievalMonitor);
if(incoming->isResponse()){
- Monitor::ScopedLock l(retrievalMonitor);
retrieved = incoming;
retrievalMonitor.notify();
}else{
- Monitor::ScopedLock l(dispatchMonitor);
messages.push(incoming);
dispatchMonitor.notify();
}
@@ -360,7 +400,7 @@
IncomingMessage* Channel::dequeue(){
Monitor::ScopedLock l(dispatchMonitor);
- while(messages.empty() && !closed){
+ while(messages.empty() && isOpen()){
dispatchMonitor.wait();
}
IncomingMessage* msg = 0;
@@ -371,25 +411,25 @@
return msg;
}
-void Channel::deliver(Consumer* consumer, Message& msg){
+void Channel::deliver(Consumer& consumer, Message& msg){
//record delivery tag:
- consumer->lastDeliveryTag = msg.getDeliveryTag();
+ consumer.lastDeliveryTag = msg.getDeliveryTag();
//allow registered listener to handle the message
- consumer->listener->received(msg);
+ consumer.listener->received(msg);
//if the handler calls close on the channel or connection while
//handling this message, then consumer will now have been deleted.
- if(!closed){
+ if(isOpen()){
bool multiple(false);
- switch(consumer->ackMode){
- case LAZY_ACK:
+ switch(consumer.ackMode){
+ case LAZY_ACK:
multiple = true;
- if(++(consumer->count) < prefetch) break;
+ if(++(consumer.count) < prefetch) break;
//else drop-through
- case AUTO_ACK:
- out->send(new AMQFrame(version, id, new BasicAckBody(version, msg.getDeliveryTag(), multiple)));
- consumer->lastDeliveryTag = 0;
+ case AUTO_ACK:
+ send(new BasicAckBody(version, msg.getDeliveryTag(), multiple));
+ consumer.lastDeliveryTag = 0;
}
}
@@ -399,7 +439,7 @@
}
void Channel::dispatch(){
- while(!closed){
+ while(isOpen()){
IncomingMessage* incomingMsg = dequeue();
if(incomingMsg){
//Note: msg is currently only valid for duration of this call
@@ -416,12 +456,10 @@
msg.deliveryTag = incomingMsg->getDeliveryTag();
std::string tag = incomingMsg->getConsumerTag();
- if(consumers[tag] == 0){
- //signal error
+ if(consumers.find(tag) == consumers.end())
std::cout << "Unknown consumer: " << tag << std::endl;
- }else{
+ else
deliver(consumers[tag], msg);
- }
}
delete incomingMsg;
}
@@ -432,14 +470,60 @@
returnsHandler = handler;
}
-void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
- responses.expect();
- out->send(frame);
- responses.receive(body);
+// Close called by local application.
+void Channel::close(
+ u_int16_t code, const std::string& text,
+ ClassId classId, MethodId methodId)
+{
+ // FIXME aconway 2007-01-26: Locking?
+ if (getId() != 0 && isOpen()) {
+ try {
+ sendAndReceive<ChannelCloseOkBody>(
+ new ChannelCloseBody(version, code, text, classId, methodId));
+ cancelAll();
+ closeInternal();
+ } catch (...) {
+ closeInternal();
+ throw;
+ }
+ }
}
-void Channel::close(){
- if(con != 0){
- con->closeChannel(this);
+// Channel closed by peer.
+void Channel::peerClose(ChannelCloseBody::shared_ptr) {
+ assert(isOpen());
+ closeInternal();
+ // FIXME aconway 2007-01-26: How to throw the proper exception
+ // to the application thread?
+}
+
+void Channel::closeInternal() {
+ assert(isOpen());
+ {
+ Monitor::ScopedLock l(dispatchMonitor);
+ static_cast<ConnectionForChannel*>(connection)->erase(getId());
+ connection = 0;
+ // A 0 response means we are closed.
+ responses.signalResponse(AMQMethodBody::shared_ptr());
+ dispatchMonitor.notify();
}
+ dispatcher.join();
}
+
+void Channel::sendAndReceive(AMQBody* toSend, ClassId c, MethodId m)
+{
+ responses.expect();
+ send(toSend);
+ responses.receive(c, m);
+}
+
+void Channel::sendAndReceiveSync(
+ bool sync, AMQBody* body, ClassId c, MethodId m)
+{
+ if(sync)
+ sendAndReceive(body, c, m);
+ else
+ send(body);
+}
+
+
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h Mon Jan 29 08:13:24 2007
@@ -27,7 +27,6 @@
#include "sys/types.h"
#include <framing/amqp_framing.h>
-#include <Connection.h>
#include <ClientExchange.h>
#include <IncomingMessage.h>
#include <ClientMessage.h>
@@ -35,86 +34,126 @@
#include <ClientQueue.h>
#include <ResponseHandler.h>
#include <ReturnedMessageHandler.h>
+#include "Runnable.h"
+#include "ChannelAdapter.h"
+#include "Thread.h"
namespace qpid {
+namespace framing {
+class ChannelCloseBody;
+}
+
namespace client {
- /**
- * The available acknowledgements modes
- *
- * \ingroup clientapi
- */
- enum ack_modes {
- /** No acknowledgement will be sent, broker can
- discard messages as soon as they are delivered
- to a consumer using this mode. **/
- NO_ACK = 0,
- /** Each message will be automatically
- acknowledged as soon as it is delivered to the
- application **/
- AUTO_ACK = 1,
- /** Acknowledgements will be sent automatically,
- but not for each message. **/
- LAZY_ACK = 2,
- /** The application is responsible for explicitly
- acknowledging messages. **/
- CLIENT_ACK = 3
+
+class Connection;
+
+/**
+ * The available acknowledgements modes
+ *
+ * \ingroup clientapi
+ */
+enum ack_modes {
+ /** No acknowledgement will be sent, broker can
+ discard messages as soon as they are delivered
+ to a consumer using this mode. **/
+ NO_ACK = 0,
+ /** Each message will be automatically
+ acknowledged as soon as it is delivered to the
+ application **/
+ AUTO_ACK = 1,
+ /** Acknowledgements will be sent automatically,
+ but not for each message. **/
+ LAZY_ACK = 2,
+ /** The application is responsible for explicitly
+ acknowledging messages. **/
+ CLIENT_ACK = 3
+};
+
+/**
+ * Represents an AMQP channel, i.e. loosely a session of work. It
+ * is through a channel that most of the AMQP 'methods' are
+ * exposed.
+ *
+ * \ingroup clientapi
+ */
+class Channel : public framing::ChannelAdapter,
+ public sys::Runnable
+{
+ struct Consumer{
+ MessageListener* listener;
+ int ackMode;
+ int count;
+ u_int64_t lastDeliveryTag;
};
+ typedef std::map<std::string, Consumer> ConsumerMap;
+ static const std::string OK;
+
+ Connection* connection;
+ sys::Thread dispatcher;
+ IncomingMessage* incoming;
+ ResponseHandler responses;
+ std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
+ IncomingMessage* retrieved;//holds response to basic.get
+ sys::Monitor dispatchMonitor;
+ sys::Monitor retrievalMonitor;
+ ConsumerMap consumers;
+ ReturnedMessageHandler* returnsHandler;
+
+ u_int16_t prefetch;
+ const bool transactional;
+ framing::ProtocolVersion version;
+
+ void enqueue();
+ void retrieve(Message& msg);
+ IncomingMessage* dequeue();
+ void dispatch();
+ void deliver(Consumer& consumer, Message& msg);
+
+ void handleHeader(framing::AMQHeaderBody::shared_ptr body);
+ void handleContent(framing::AMQContentBody::shared_ptr body);
+ void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
+
+ void handleMethodInContext(
+ framing::AMQMethodBody::shared_ptr,
+ const framing::MethodContext& method);
+ void setQos();
+ void cancelAll();
+
+ void protocolInit(
+ const std::string& uid, const std::string& pwd,
+ const std::string& vhost);
+
+ void sendAndReceive(
+ framing::AMQBody*, framing::ClassId, framing::MethodId);
+
+ void sendAndReceiveSync(
+ bool sync,
+ framing::AMQBody*, framing::ClassId, framing::MethodId);
+
+ template <class BodyType>
+ boost::shared_ptr<BodyType> sendAndReceive(framing::AMQBody* body) {
+ sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID);
+ return boost::shared_polymorphic_downcast<BodyType>(
+ responses.getResponse());
+ }
+
+ template <class BodyType> void sendAndReceiveSync(
+ bool sync, framing::AMQBody* body) {
+ sendAndReceiveSync(
+ sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID);
+ }
+
+ void open(framing::ChannelId, Connection&);
+ void closeInternal();
+ void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
+
+ friend class Connection;
+
+ public:
+ bool isOpen() const;
+
/**
- * Represents an AMQP channel, i.e. loosely a session of work. It
- * is through a channel that most of the AMQP 'methods' are
- * exposed.
- *
- * \ingroup clientapi
- */
- class Channel : private virtual framing::BodyHandler,
- public virtual sys::Runnable
- {
- struct Consumer{
- MessageListener* listener;
- int ackMode;
- int count;
- u_int64_t lastDeliveryTag;
- };
- typedef std::map<std::string,Consumer*>::iterator consumer_iterator;
-
- u_int16_t id;
- Connection* con;
- sys::Thread dispatcher;
- framing::OutputHandler* out;
- IncomingMessage* incoming;
- ResponseHandler responses;
- std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
- IncomingMessage* retrieved;//holds response to basic.get
- sys::Monitor dispatchMonitor;
- sys::Monitor retrievalMonitor;
- std::map<std::string, Consumer*> consumers;
- ReturnedMessageHandler* returnsHandler;
- bool closed;
-
- u_int16_t prefetch;
- const bool transactional;
- framing::ProtocolVersion version;
-
- void enqueue();
- void retrieve(Message& msg);
- IncomingMessage* dequeue();
- void dispatch();
- void stop();
- void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body);
- void deliver(Consumer* consumer, Message& msg);
- void setQos();
- void cancelAll();
-
- virtual void handleMethod(framing::AMQMethodBody::shared_ptr body);
- virtual void handleHeader(framing::AMQHeaderBody::shared_ptr body);
- virtual void handleContent(framing::AMQContentBody::shared_ptr body);
- virtual void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
- void handleRequest(framing::AMQRequestBody::shared_ptr);
- void handleResponse(framing::AMQResponseBody::shared_ptr);
-
- public:
- /**
* Creates a channel object.
*
* @param transactional if true, the publishing and acknowledgement
@@ -124,199 +163,202 @@
* @param prefetch specifies the number of unacknowledged
* messages the channel is willing to have sent to it
* asynchronously
- */
+ */
Channel(bool transactional = false, u_int16_t prefetch = 500);
~Channel();
- /**
- * Declares an exchange.
- *
- * In AMQP Exchanges are the destinations to which messages
- * are published. They have Queues bound to them and route
- * messages they receive to those queues. The routing rules
- * depend on the type of the exchange.
- *
- * @param exchange an Exchange object representing the
- * exchange to declare
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void declareExchange(Exchange& exchange, bool synch = true);
- /**
- * Deletes an exchange
- *
- * @param exchange an Exchange object representing the exchange to delete
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void deleteExchange(Exchange& exchange, bool synch = true);
- /**
- * Declares a Queue
- *
- * @param queue a Queue object representing the queue to declare
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void declareQueue(Queue& queue, bool synch = true);
- /**
- * Deletes a Queue
- *
- * @param queue a Queue object representing the queue to delete
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
- /**
- * Binds a queue to an exchange. The exact semantics of this
- * (in particular how 'routing keys' and 'binding arguments'
- * are used) depends on the type of the exchange.
- *
- * @param exchange an Exchange object representing the
- * exchange to bind to
- *
- * @param queue a Queue object representing the queue to be
- * bound
- *
- * @param key the 'routing key' for the binding
- *
- * @param args the 'binding arguments' for the binding
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void bind(const Exchange& exchange, const Queue& queue, const std::string& key,
- const framing::FieldTable& args, bool synch = true);
- /**
- * Creates a 'consumer' for a queue. Messages in (or arriving
- * at) that queue will be delivered to consumers
- * asynchronously.
- *
- * @param queue a Queue instance representing the queue to
- * consume from
- *
- * @param tag an identifier to associate with the consumer
- * that can be used to cancel its subscription (if empty, this
- * will be assigned by the broker)
- *
- * @param listener a pointer to an instance of an
- * implementation of the MessageListener interface. Messages
- * received from this queue for this consumer will result in
- * invocation of the received() method on the listener, with
- * the message itself passed in.
- *
- * @param ackMode the mode of acknowledgement that the broker
- * should assume for this consumer. @see ack_modes
- *
- * @param noLocal if true, this consumer will not be sent any
- * message published by this connection
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- int ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const framing::FieldTable* fields = 0);
+ /**
+ * Declares an exchange.
+ *
+ * In AMQP Exchanges are the destinations to which messages
+ * are published. They have Queues bound to them and route
+ * messages they receive to those queues. The routing rules
+ * depend on the type of the exchange.
+ *
+ * @param exchange an Exchange object representing the
+ * exchange to declare
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void declareExchange(Exchange& exchange, bool synch = true);
+ /**
+ * Deletes an exchange
+ *
+ * @param exchange an Exchange object representing the exchange to delete
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void deleteExchange(Exchange& exchange, bool synch = true);
+ /**
+ * Declares a Queue
+ *
+ * @param queue a Queue object representing the queue to declare
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void declareQueue(Queue& queue, bool synch = true);
+ /**
+ * Deletes a Queue
+ *
+ * @param queue a Queue object representing the queue to delete
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
+ /**
+ * Binds a queue to an exchange. The exact semantics of this
+ * (in particular how 'routing keys' and 'binding arguments'
+ * are used) depends on the type of the exchange.
+ *
+ * @param exchange an Exchange object representing the
+ * exchange to bind to
+ *
+ * @param queue a Queue object representing the queue to be
+ * bound
+ *
+ * @param key the 'routing key' for the binding
+ *
+ * @param args the 'binding arguments' for the binding
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void bind(const Exchange& exchange, const Queue& queue, const std::string& key,
+ const framing::FieldTable& args, bool synch = true);
+ /**
+ * Creates a 'consumer' for a queue. Messages in (or arriving
+ * at) that queue will be delivered to consumers
+ * asynchronously.
+ *
+ * @param queue a Queue instance representing the queue to
+ * consume from
+ *
+ * @param tag an identifier to associate with the consumer
+ * that can be used to cancel its subscription (if empty, this
+ * will be assigned by the broker)
+ *
+ * @param listener a pointer to an instance of an
+ * implementation of the MessageListener interface. Messages
+ * received from this queue for this consumer will result in
+ * invocation of the received() method on the listener, with
+ * the message itself passed in.
+ *
+ * @param ackMode the mode of acknowledgement that the broker
+ * should assume for this consumer. @see ack_modes
+ *
+ * @param noLocal if true, this consumer will not be sent any
+ * message published by this connection
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ int ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+ const framing::FieldTable* fields = 0);
- /**
- * Cancels a subscription previously set up through a call to consume().
- *
- * @param tag the identifier used (or assigned) in the consume
- * request that set up the subscription to be cancelled.
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void cancel(std::string& tag, bool synch = true);
- /**
- * Synchronous pull of a message from a queue.
- *
- * @param msg a message object that will contain the message
- * headers and content if the call completes.
- *
- * @param queue the queue to consume from
- *
- * @param ackMode the acknowledgement mode to use (@see
- * ack_modes)
- *
- * @return true if a message was succcessfully dequeued from
- * the queue, false if the queue was empty.
- */
- bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
- /**
- * Publishes (i.e. sends a message to the broker).
- *
- * @param msg the message to publish
- *
- * @param exchange the exchange to publish the message to
- *
- * @param routingKey the routing key to publish with
- *
- * @param mandatory if true and the exchange to which this
- * publish is directed has no matching bindings, the message
- * will be returned (see setReturnedMessageHandler()).
- *
- * @param immediate if true and there is no consumer to
- * receive this message on publication, the message will be
- * returned (see setReturnedMessageHandler()).
- */
- void publish(Message& msg, const Exchange& exchange, const std::string& routingKey,
- bool mandatory = false, bool immediate = false);
-
- /**
- * For a transactional channel this will commit all
- * publications and acknowledgements since the last commit (or
- * the channel was opened if there has been no previous
- * commit). This will cause published messages to become
- * available to consumers and acknowledged messages to be
- * consumed and removed from the queues they were dispatched
- * from.
- *
- * Transactionailty of a channel is specified when the channel
- * object is created (@see Channel()).
- */
- void commit();
- /**
- * For a transactional channel, this will rollback any
- * publications or acknowledgements. It will be as if the
- * ppblished messages were never sent and the acknowledged
- * messages were never consumed.
- */
- void rollback();
-
- /**
- * Change the prefetch in use.
- */
- void setPrefetch(u_int16_t prefetch);
-
- /**
- * Start message dispatching on a new thread
- */
- void start();
- /**
- * Do message dispatching on this thread
- */
- void run();
-
- /**
- * Closes a channel, stopping any message dispatching.
- */
- void close();
-
- /**
- * Set a handler for this channel that will process any
- * returned messages
- *
- * @see publish()
- */
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+ /**
+ * Cancels a subscription previously set up through a call to consume().
+ *
+ * @param tag the identifier used (or assigned) in the consume
+ * request that set up the subscription to be cancelled.
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void cancel(const std::string& tag, bool synch = true);
+ /**
+ * Synchronous pull of a message from a queue.
+ *
+ * @param msg a message object that will contain the message
+ * headers and content if the call completes.
+ *
+ * @param queue the queue to consume from
+ *
+ * @param ackMode the acknowledgement mode to use (@see
+ * ack_modes)
+ *
+ * @return true if a message was succcessfully dequeued from
+ * the queue, false if the queue was empty.
+ */
+ bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
+ /**
+ * Publishes (i.e. sends a message to the broker).
+ *
+ * @param msg the message to publish
+ *
+ * @param exchange the exchange to publish the message to
+ *
+ * @param routingKey the routing key to publish with
+ *
+ * @param mandatory if true and the exchange to which this
+ * publish is directed has no matching bindings, the message
+ * will be returned (see setReturnedMessageHandler()).
+ *
+ * @param immediate if true and there is no consumer to
+ * receive this message on publication, the message will be
+ * returned (see setReturnedMessageHandler()).
+ */
+ void publish(Message& msg, const Exchange& exchange, const std::string& routingKey,
+ bool mandatory = false, bool immediate = false);
- friend class Connection;
- };
+ /**
+ * For a transactional channel this will commit all
+ * publications and acknowledgements since the last commit (or
+ * the channel was opened if there has been no previous
+ * commit). This will cause published messages to become
+ * available to consumers and acknowledged messages to be
+ * consumed and removed from the queues they were dispatched
+ * from.
+ *
+ * Transactionailty of a channel is specified when the channel
+ * object is created (@see Channel()).
+ */
+ void commit();
+ /**
+ * For a transactional channel, this will rollback any
+ * publications or acknowledgements. It will be as if the
+ * ppblished messages were never sent and the acknowledged
+ * messages were never consumed.
+ */
+ void rollback();
+
+ /**
+ * Change the prefetch in use.
+ */
+ void setPrefetch(u_int16_t prefetch);
+
+ /**
+ * Start message dispatching on a new thread
+ */
+ void start();
+
+ // TODO aconway 2007-01-26: Can it be private?
+ /**
+ * Dispatch messages on this channel in the calling thread.
+ */
+ void run();
+
+ /**
+ * Close the channel with optional error information.
+ * Closing a channel that is not open has no effect.
+ */
+ void close(
+ framing::ReplyCode = 200, const std::string& =OK,
+ framing::ClassId = 0, framing::MethodId = 0);
+
+ /**
+ * Set a handler for this channel that will process any
+ * returned messages
+ *
+ * @see publish()
+ */
+ void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+};
}
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp Mon Jan 29 08:13:24 2007
@@ -18,35 +18,46 @@
* under the License.
*
*/
+#include <boost/format.hpp>
+
#include <Connection.h>
#include <ClientChannel.h>
#include <ClientMessage.h>
#include <QpidError.h>
#include <iostream>
+#include <sstream>
#include <MethodBodyInstances.h>
-using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
using namespace qpid::sys;
-u_int16_t Connection::channelIdCounter;
+
+namespace qpid {
+namespace client {
+
+ChannelId Connection::channelIdCounter;
+
+const std::string Connection::OK("OK");
Connection::Connection(
bool debug, u_int32_t _max_frame_size,
- qpid::framing::ProtocolVersion* _version
+ const framing::ProtocolVersion& _version
) : max_frame_size(_max_frame_size), closed(true),
- version(_version->getMajor(),_version->getMinor())
+ version(_version)
{
- connector = new Connector(
- version, requester, responder, debug, _max_frame_size);
+ connector = new Connector(version, debug, _max_frame_size);
}
Connection::~Connection(){
delete connector;
}
-void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){
+void Connection::open(
+ const std::string& _host, int _port, const std::string& uid,
+ const std::string& pwd, const std::string& virtualhost)
+{
+
host = _host;
port = _port;
connector->setInputHandler(this);
@@ -55,197 +66,69 @@
out = connector->getOutputHandler();
connector->connect(host, port);
- ProtocolInitiation* header = new ProtocolInitiation(version);
- responses.expect();
- connector->init(header);
- responses.receive(method_bodies.connection_start);
-
- FieldTable props;
- string mechanism("PLAIN");
- string response = ((char)0) + uid + ((char)0) + pwd;
- string locale("en_US");
- responses.expect();
- out->send(new AMQFrame(version, 0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));
-
- /**
- * Assume for now that further challenges will not be required
- //receive connection.secure
- responses.receive(connection_secure));
- //send connection.secure-ok
- out->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
- **/
-
- responses.receive(method_bodies.connection_tune);
-
- ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
- out->send(new AMQFrame(version, 0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
-
- u_int16_t heartbeat = proposal->getHeartbeat();
- connector->setReadTimeout(heartbeat * 2);
- connector->setWriteTimeout(heartbeat);
-
- //send connection.open
- string capabilities;
- string vhost = virtualhost;
- responses.expect();
- out->send(new AMQFrame(version, 0, new ConnectionOpenBody(version, vhost, capabilities, true)));
- //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
- responses.waitForResponse();
- if(responses.validate(method_bodies.connection_open_ok)){
- //ok
- }else if(responses.validate(method_bodies.connection_redirect)){
- //ignore for now
- ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse()));
- std::cout << "Received redirection to " << redirect->getHost() << std::endl;
- }else{
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
- }
-
+ // Open the special channel 0.
+ channels[0] = &channel0;
+ channel0.open(0, *this);
+ channel0.protocolInit(uid, pwd, virtualhost);
}
-void Connection::close(){
- if(!closed){
- u_int16_t code(200);
- string text("Ok");
- u_int16_t classId(0);
- u_int16_t methodId(0);
-
- sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
+void Connection::close(
+ ReplyCode code, const string& msg, ClassId classId, MethodId methodId
+)
+{
+ if(!closed) {
+ channel0.sendAndReceive<ConnectionCloseOkBody>(
+ new ConnectionCloseBody(
+ getVersion(), code, msg, classId, methodId));
connector->close();
}
}
-void Connection::openChannel(Channel* channel){
- channel->con = this;
- channel->id = ++channelIdCounter;
- channel->out = out;
- channels[channel->id] = channel;
- //now send frame to open channel and wait for response
- string oob;
- channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
- channel->setQos();
- channel->closed = false;
-}
-
-void Connection::closeChannel(Channel* channel){
- //send frame to close channel
- u_int16_t code(200);
- string text("Ok");
- u_int16_t classId(0);
- u_int16_t methodId(0);
- closeChannel(channel, code, text, classId, methodId);
-}
-
-void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){
- //send frame to close channel
- channel->cancelAll();
- channel->closed = true;
- channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
- channel->con = 0;
- channel->out = 0;
- removeChannel(channel);
-}
-
-void Connection::removeChannel(Channel* channel){
- //send frame to close channel
-
- channels.erase(channel->id);
- channel->out = 0;
- channel->id = 0;
- channel->con = 0;
+// FIXME aconway 2007-01-26: make channels owned and created by connection?
+void Connection::openChannel(Channel& channel) {
+ ChannelId id = ++channelIdCounter;
+ assert (channels.find(id) == channels.end());
+ assert(out);
+ channels[id] = &channel;
+ channel.open(id, *this);
}
-void Connection::received(AMQFrame* frame){
- AMQBody::shared_ptr body = frame->getBody();
- u_int8_t type = body->type();
- if (type == REQUEST_BODY)
- responder.received(AMQRequestBody::getData(body));
- handleFrame(frame);
- if (type == RESPONSE_BODY)
- requester.processed(AMQResponseBody::getData(body));
-}
-
-void Connection::handleFrame(AMQFrame* frame){
- u_int16_t channelId = frame->getChannel();
-
- if(channelId == 0){
- this->handleBody(frame->getBody());
- }else{
- Channel* channel = channels[channelId];
- if(channel == 0){
- error(504, "Unknown channel");
- }else{
- try{
- channel->handleBody(frame->getBody());
- }catch(qpid::QpidError e){
- channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
- }
- }
- }
+void Connection::erase(ChannelId id) {
+ channels.erase(id);
}
-void Connection::handleRequest(AMQRequestBody::shared_ptr body) {
- // FIXME aconway 2007-01-19: request/response handling.
- handleMethod(body);
-}
-
-void Connection::handleResponse(AMQResponseBody::shared_ptr body) {
- // FIXME aconway 2007-01-19: request/response handling.
- handleMethod(body);
-}
-
-void Connection::handleMethod(AMQMethodBody::shared_ptr body){
- //connection.close, basic.deliver, basic.return or a response to a synchronous request
- if(responses.isWaiting()){
- responses.signalResponse(body);
- }else if(method_bodies.connection_close.match(body.get())){
- //send back close ok
- //close socket
- ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get());
- std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl;
- connector->close();
- }else{
- std::cout << "Unhandled method for connection: " << *body << std::endl;
- error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId());
+void Connection::received(AMQFrame* frame){
+ // FIXME aconway 2007-01-25: Mutex
+ ChannelId id = frame->getChannel();
+ Channel* channel = channels[id];
+ // FIXME aconway 2007-01-26: Exception thrown here is hanging the
+ // client. Need to review use of exceptions.
+ if (channel == 0)
+ THROW_QPID_ERROR(
+ PROTOCOL_ERROR+504,
+ (boost::format("Invalid channel number %g") % id).str());
+ try{
+ channel->handleBody(frame->getBody());
+ }catch(const qpid::QpidError& e){
+ channelException(
+ *channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
}
}
-
-void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){
- error(504, "Channel error: received header body with channel 0.");
-}
-
-void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){
- error(504, "Channel error: received content body with channel 0.");
-}
-
-void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
-}
-void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
- responses.expect();
+void Connection::send(AMQFrame* frame) {
out->send(frame);
- responses.receive(body);
}
-void Connection::error(int code, const string& msg, int classid, int methodid){
- std::cout << "Connection exception generated: " << code << msg;
- if(classid || methodid){
- std::cout << " [" << methodid << ":" << classid << "]";
- }
- std::cout << std::endl;
- sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
- connector->close();
-}
-
-void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){
- std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl;
- int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500;
+void Connection::channelException(
+ Channel& channel, AMQMethodBody* method, const QpidError& e)
+{
+ int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500;
string msg = e.msg;
- if(method == 0){
- closeChannel(channel, code, msg);
- }else{
- closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId());
- }
+ if(method == 0)
+ channel.close(code, msg);
+ else
+ channel.close(
+ code, msg, method->amqpClassId(), method->amqpMethodId());
}
void Connection::idleIn(){
@@ -259,9 +142,12 @@
void Connection::shutdown(){
closed = true;
- //close all channels
- for(iterator i = channels.begin(); i != channels.end(); i++){
- i->second->stop();
+ //close all channels, also removes them from the map.
+ while(!channels.empty()){
+ Channel* channel = channels.begin()->second;
+ if (channel != 0)
+ channel->close();
}
- responses.signalResponse(AMQMethodBody::shared_ptr());
}
+
+}} // namespace qpid::client
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h?view=diff&rev=501087&r1=501086&r2=501087
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h Mon Jan 29 08:13:24 2007
@@ -1,3 +1,6 @@
+#ifndef _Connection_
+#define _Connection_
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,15 +23,15 @@
*/
#include <map>
#include <string>
+#include <boost/shared_ptr.hpp>
-#ifndef _Connection_
-#define _Connection_
-
+#include "amqp_types.h"
#include <QpidError.h>
#include <Connector.h>
#include <sys/ShutdownHandler.h>
#include <sys/TimeoutHandler.h>
+#include "framing/amqp_types.h"
#include <framing/amqp_framing.h>
#include <ClientExchange.h>
#include <IncomingMessage.h>
@@ -37,150 +40,152 @@
#include <ClientQueue.h>
#include <ResponseHandler.h>
#include <AMQP_HighestVersion.h>
-#include "Requester.h"
-#include "Responder.h"
+#include "ClientChannel.h"
namespace qpid {
+/**
+ * The client namespace contains all classes that make up a client
+ * implementation of the AMQP protocol. The key classes that form
+ * the basis of the client API to be used by applications are
+ * Connection and Channel.
+ */
+namespace client {
+
+class Channel;
+
+/**
+ * \internal provide access to selected private channel functions
+ * for the Connection without making it a friend of the entire channel.
+ */
+class ConnectionForChannel :
+ public framing::InputHandler,
+ public framing::OutputHandler,
+ public sys::TimeoutHandler,
+ public sys::ShutdownHandler
+
+{
+ private:
+ friend class Channel;
+ virtual void erase(framing::ChannelId) = 0;
+};
+
+
+/**
+ * \defgroup clientapi Application API for an AMQP client
+ */
+
+/**
+ * Represents a connection to an AMQP broker. All communication is
+ * initiated by establishing a connection, then opening one or
+ * more Channels over that connection.
+ *
+ * \ingroup clientapi
+ */
+class Connection : public ConnectionForChannel
+{
+ typedef std::map<framing::ChannelId, Channel*> ChannelMap;
+
+ static framing::ChannelId channelIdCounter;
+ static const std::string OK;
+
+ std::string host;
+ int port;
+ const u_int32_t max_frame_size;
+ ChannelMap channels;
+ Connector* connector;
+ framing::OutputHandler* out;
+ volatile bool closed;
+ framing::ProtocolVersion version;
+
+ void erase(framing::ChannelId);
+ void channelException(
+ Channel&, framing::AMQMethodBody*, const QpidError&);
+ Channel channel0;
+
+ // TODO aconway 2007-01-26: too many friendships, untagle these classes.
+ friend class Channel;
+
+ public:
+ const framing::ProtocolVersion& getVersion() const { return version; }
+
+ /**
+ * Creates a connection object, but does not open the
+ * connection.
+ *
+ * @param _version the version of the protocol to connect with
+ *
+ * @param debug turns on tracing for the connection
+ * (i.e. prints details of the frames sent and received to std
+ * out). Optional and defaults to false.
+ *
+ * @param max_frame_size the maximum frame size that the
+ * client will accept. Optional and defaults to 65536.
+ */
+ Connection(
+ bool debug = false, u_int32_t max_frame_size = 65536,
+ const framing::ProtocolVersion& = framing::highestProtocolVersion);
+ ~Connection();
+
/**
- * The client namespace contains all classes that make up a client
- * implementation of the AMQP protocol. The key classes that form
- * the basis of the client API to be used by applications are
- * Connection and Channel.
+ * Opens a connection to a broker.
+ *
+ * @param host the host on which the broker is running
+ *
+ * @param port the port on the which the broker is listening
+ *
+ * @param uid the userid to connect with
+ *
+ * @param pwd the password to connect with (currently SASL
+ * PLAIN is the only authentication method supported so this
+ * is sent in clear text)
+ *
+ * @param virtualhost the AMQP virtual host to use (virtual
+ * hosts, where implemented(!), provide namespace partitioning
+ * within a single broker).
*/
-namespace client {
+ void open(const std::string& host, int port = 5672,
+ const std::string& uid = "guest", const std::string& pwd = "guest",
+ const std::string& virtualhost = "/");
- class Channel;
/**
- * \defgroup clientapi Application API for an AMQP client
+ * Close the connection with optional error information for the peer.
+ *
+ * Any further use of this connection (without reopening it) will
+ * not succeed.
*/
+ void close(framing::ReplyCode=200, const std::string& msg=OK,
+ framing::ClassId = 0, framing::MethodId = 0);
/**
- * Represents a connection to an AMQP broker. All communication is
- * initiated by establishing a connection, then opening one or
- * more Channels over that connection.
+ * Associate a Channel with this connection and open it for use.
+ *
+ * In AMQP channels are like multi-plexed 'sessions' of work over
+ * a connection. Almost all the interaction with AMQP is done over
+ * a channel.
*
- * \ingroup clientapi
+ * @param connection the connection object to be associated with
+ * the channel. Call Channel::close() to close the channel.
+ */
+ void openChannel(Channel&);
+
+
+ // TODO aconway 2007-01-26: can these be private?
+ void send(framing::AMQFrame*);
+ void received(framing::AMQFrame*);
+ void idleOut();
+ void idleIn();
+ void shutdown();
+
+ /**
+ * @return the maximum frame size in use on this connection
*/
- class Connection : public virtual qpid::framing::InputHandler,
- public virtual qpid::sys::TimeoutHandler,
- public virtual qpid::sys::ShutdownHandler,
- private virtual qpid::framing::BodyHandler
- {
-
- typedef std::map<int, Channel*>::iterator iterator;
-
- static u_int16_t channelIdCounter;
-
- std::string host;
- int port;
- const u_int32_t max_frame_size;
- std::map<int, Channel*> channels;
- Connector* connector;
- qpid::framing::OutputHandler* out;
- ResponseHandler responses;
- volatile bool closed;
- framing::ProtocolVersion version;
- framing::Requester requester;
- framing::Responder responder;
-
- void channelException(Channel* channel, framing::AMQMethodBody* body, QpidError& e);
- void error(int code, const std::string& msg, int classid = 0, int methodid = 0);
- void closeChannel(Channel* channel, u_int16_t code, std::string& text, u_int16_t classId = 0, u_int16_t methodId = 0);
- void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body);
-
- // FIXME aconway 2007-01-19: Use channel(0) not connection
- // to handle channel 0 requests. Remove handler methods.
- //
- void handleRequest(framing::AMQRequestBody::shared_ptr);
- void handleResponse(framing::AMQResponseBody::shared_ptr);
- void handleMethod(framing::AMQMethodBody::shared_ptr);
- void handleHeader(framing::AMQHeaderBody::shared_ptr);
- void handleContent(framing::AMQContentBody::shared_ptr);
- void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr);
- void handleFrame(framing::AMQFrame* frame);
-
- public:
- /**
- * Creates a connection object, but does not open the
- * connection.
- *
- * @param _version the version of the protocol to connect with
- *
- * @param debug turns on tracing for the connection
- * (i.e. prints details of the frames sent and received to std
- * out). Optional and defaults to false.
- *
- * @param max_frame_size the maximum frame size that the
- * client will accept. Optional and defaults to 65536.
- */
- Connection( bool debug = false, u_int32_t max_frame_size = 65536,
- framing::ProtocolVersion* _version = &(framing::highestProtocolVersion));
- ~Connection();
-
- /**
- * Opens a connection to a broker.
- *
- * @param host the host on which the broker is running
- *
- * @param port the port on the which the broker is listening
- *
- * @param uid the userid to connect with
- *
- * @param pwd the password to connect with (currently SASL
- * PLAIN is the only authentication method supported so this
- * is sent in clear text)
- *
- * @param virtualhost the AMQP virtual host to use (virtual
- * hosts, where implemented(!), provide namespace partitioning
- * within a single broker).
- */
- void open(const std::string& host, int port = 5672,
- const std::string& uid = "guest", const std::string& pwd = "guest",
- const std::string& virtualhost = "/");
- /**
- * Closes the connection. Any further use of this connection
- * (without reopening it) will not succeed.
- */
- void close();
- /**
- * Opens a Channel. In AMQP channels are like multi-plexed
- * 'sessions' of work over a connection. Almost all the
- * interaction with AMQP is done over a channel.
- *
- * @param channel a pointer to a channel instance that will be
- * used to represent the new channel.
- */
- void openChannel(Channel* channel);
- /*
- * Requests that the server close this channel, then removes
- * the association to the channel from this connection
- *
- * @param channel a pointer to the channel instance to close
- */
- void closeChannel(Channel* channel);
- /*
- * Removes the channel from association with this connection,
- * without sending a close request to the server.
- *
- * @param channel a pointer to the channel instance to
- * disassociate
- */
- void removeChannel(Channel* channel);
-
- virtual void received(framing::AMQFrame* frame);
-
- virtual void idleOut();
- virtual void idleIn();
-
- virtual void shutdown();
-
- /**
- * @return the maximum frame size in use on this connection
- */
- inline u_int32_t getMaxFrameSize(){ return max_frame_size; }
- };
+ inline u_int32_t getMaxFrameSize(){ return max_frame_size; }
+
+ /** @return protocol version in use on this connection. */
+ const framing::ProtocolVersion& getVersion() { return version; }
+};
}
}