You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/08/02 20:09:51 UTC
svn commit: r562212 [1/2] - in /incubator/qpid/trunk/qpid: cpp/src/
cpp/src/qpid/broker/ cpp/src/qpid/client/ cpp/src/tests/ python/qpid/
Author: gsim
Date: Thu Aug 2 11:09:48 2007
New Revision: 562212
URL: http://svn.apache.org/viewvc?view=rev&rev=562212
Log:
Some restructuring of the client code:
* Introduced three separate 'handlers' for the connection, channel and execution 'layers'.
* Support for asynchronous retrieval of response or completion status.
* Channel methods no longer included in execution layers command id count.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChainableFrameHandler.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ReceivedContent.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.h (with props)
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/IncomingMessage.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/IncomingMessage.h
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
incubator/qpid/trunk/qpid/cpp/src/tests/ (props changed)
incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp
incubator/qpid/trunk/qpid/python/qpid/message.py
incubator/qpid/trunk/qpid/python/qpid/peer.py
incubator/qpid/trunk/qpid/python/qpid/spec.py
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Aug 2 11:09:48 2007
@@ -229,12 +229,21 @@
qpid/client/ClientChannel.cpp \
qpid/client/ClientExchange.cpp \
qpid/client/ClientQueue.cpp \
- qpid/client/BasicMessageChannel.cpp \
qpid/client/Connector.cpp \
- qpid/client/IncomingMessage.cpp \
qpid/client/MessageListener.cpp \
qpid/client/ResponseHandler.cpp \
- qpid/client/ReturnedMessageHandler.cpp
+ qpid/client/ReturnedMessageHandler.cpp \
+ qpid/client/Correlator.cpp \
+ qpid/client/CompletionTracker.cpp \
+ qpid/client/ChannelHandler.cpp \
+ qpid/client/ConnectionHandler.cpp \
+ qpid/client/ExecutionHandler.cpp \
+ qpid/client/FutureCompletion.cpp \
+ qpid/client/FutureResponse.cpp \
+ qpid/client/FutureFactory.cpp \
+ qpid/client/ReceivedContent.cpp \
+ qpid/client/StateManager.cpp
+
nobase_include_HEADERS = \
$(platform_hdr) \
@@ -306,19 +315,29 @@
qpid/broker/TransactionalStore.h \
qpid/broker/TxAck.h \
qpid/client/AckMode.h \
- qpid/client/BasicMessageChannel.h \
qpid/client/ClientChannel.h \
qpid/client/ClientExchange.h \
qpid/client/ClientMessage.h \
qpid/client/ClientQueue.h \
qpid/client/Connection.h \
qpid/client/Connector.h \
- qpid/client/IncomingMessage.h \
qpid/client/MessageChannel.h \
qpid/client/MessageListener.h \
qpid/client/MethodBodyInstances.h \
qpid/client/ResponseHandler.h \
qpid/client/ReturnedMessageHandler.h \
+ qpid/client/BlockingQueue.h \
+ qpid/client/Correlator.h \
+ qpid/client/CompletionTracker.h \
+ qpid/client/ChannelHandler.h \
+ qpid/client/ChainableFrameHandler.h \
+ qpid/client/ConnectionHandler.h \
+ qpid/client/ExecutionHandler.h \
+ qpid/client/FutureCompletion.h \
+ qpid/client/FutureResponse.h \
+ qpid/client/FutureFactory.h \
+ qpid/client/ReceivedContent.h \
+ qpid/client/StateManager.h \
qpid/framing/AMQBody.h \
qpid/framing/AMQContentBody.h \
qpid/framing/AMQDataBlock.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Thu Aug 2 11:09:48 2007
@@ -68,7 +68,11 @@
handleL4(method, context);
//(if the frameset is complete) we can move the execution-mark
//forward
- ++(incoming.hwm);
+
+ //temporary hack until channel management is moved to its own handler:
+ if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
+ ++(incoming.hwm);
+ }
//note: need to be more sophisticated than this if we execute
//commands that arrive within an active message frameset (that
@@ -175,8 +179,11 @@
Mutex::ScopedLock l(outLock);
uint8_t type(body->type());
if (type == REQUEST_BODY || type == RESPONSE_BODY || type == METHOD_BODY) {
- ++outgoing.hwm;
- //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl;
+ //temporary hack until channel management is moved to its own handler:
+ if (dynamic_pointer_cast<AMQMethodBody>(body)->amqpClassId() != ChannelOpenBody::CLASS_ID) {
+ ++outgoing.hwm;
+ //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl;
+ }
}
return ChannelAdapter::send(body, action);
}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h Thu Aug 2 11:09:48 2007
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _BlockingQueue_
+#define _BlockingQueue_
+
+#include <queue>
+#include "qpid/sys/Monitor.h"
+
+namespace qpid {
+namespace client {
+
+struct QueueClosed {};
+
+template <class T>
+class BlockingQueue
+{
+ sys::Monitor lock;
+ std::queue<T> queue;
+ bool closed;
+
+public:
+
+ BlockingQueue() : closed(false) {}
+
+ void reset()
+ {
+ sys::Monitor::ScopedLock l(lock);
+ closed = true;
+ }
+
+ T pop()
+ {
+ sys::Monitor::ScopedLock l(lock);
+ while (!closed && queue.empty()) {
+ lock.wait();
+ }
+ if (closed) {
+ throw QueueClosed();
+ } else {
+ T t = queue.front();
+ queue.pop();
+ return t;
+ }
+ }
+
+ void push(T t)
+ {
+ sys::Monitor::ScopedLock l(lock);
+ bool wasEmpty = queue.empty();
+ queue.push(t);
+ if (wasEmpty) {
+ lock.notifyAll();
+ }
+ }
+
+ void close()
+ {
+ sys::Monitor::ScopedLock l(lock);
+ closed = true;
+ lock.notifyAll();
+ }
+};
+
+}}
+
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChainableFrameHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChainableFrameHandler.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChainableFrameHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChainableFrameHandler.h Thu Aug 2 11:09:48 2007
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _ChainableFrameHandler_
+#define _ChainableFrameHandler_
+
+#include <boost/function.hpp>
+#include "qpid/framing/AMQFrame.h"
+
+namespace qpid {
+namespace client {
+
+struct ChainableFrameHandler
+{
+ typedef boost::function<void(framing::AMQFrame&)> FrameDelegate;
+
+ FrameDelegate in;
+ FrameDelegate out;
+
+ ChainableFrameHandler() {}
+ ChainableFrameHandler(FrameDelegate i, FrameDelegate o): in(i), out(o) {}
+ virtual ~ChainableFrameHandler() {}
+};
+
+}}
+
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChainableFrameHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChainableFrameHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp Thu Aug 2 11:09:48 2007
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ChannelHandler.h"
+#include "qpid/framing/amqp_framing.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace boost;
+
+ChannelHandler::ChannelHandler() : StateManager(CLOSED), id(0) {}
+
+void ChannelHandler::incoming(AMQFrame& frame)
+{
+ AMQBody::shared_ptr body = frame.getBody();
+ if (getState() == OPEN) {
+ if (isA<ChannelCloseBody>(body)) {
+ ChannelCloseBody::shared_ptr method(shared_polymorphic_cast<ChannelCloseBody>(body));
+ setState(CLOSED);
+ if (onClose) {
+ onClose(method->getReplyCode(), method->getReplyText());
+ }
+ } else {
+ try {
+ in(frame);
+ }catch(ChannelException& e){
+ if (body->type() == METHOD_BODY) {
+ AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body));
+ close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+ } else {
+ close(e.code, e.toString(), 0, 0);
+ }
+ }
+ }
+ } else {
+ if (body->type() == METHOD_BODY) {
+ handleMethod(shared_polymorphic_cast<AMQMethodBody>(body));
+ } else {
+ throw new ConnectionException(504, "Channel not open.");
+ }
+
+ }
+}
+
+void ChannelHandler::outgoing(AMQFrame& frame)
+{
+ if (getState() == OPEN) {
+ frame.channel = id;
+ out(frame);
+ } else {
+ throw Exception("Channel not open");
+ }
+}
+
+void ChannelHandler::open(uint16_t _id)
+{
+ id = _id;
+
+ setState(OPENING);
+ AMQFrame f(version, id, make_shared_ptr(new ChannelOpenBody(version)));
+ out(f);
+
+ std::set<int> states;
+ states.insert(OPEN);
+ states.insert(CLOSED);
+ waitFor(states);
+ if (getState() != OPEN) {
+ throw Exception("Failed to open channel.");
+ }
+}
+
+void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
+{
+ setState(CLOSING);
+ AMQFrame f(version, id, make_shared_ptr(new ChannelCloseBody(version, code, message, classId, methodId)));
+ out(f);
+}
+
+void ChannelHandler::close()
+{
+ close(200, "OK", 0, 0);
+ waitFor(CLOSED);
+}
+
+void ChannelHandler::handleMethod(AMQMethodBody::shared_ptr method)
+{
+ switch (getState()) {
+ case OPENING:
+ if (method->isA<ChannelOpenOkBody>()) {
+ setState(OPEN);
+ } else {
+ throw ConnectionException(504, "Channel not opened.");
+ }
+ break;
+ case CLOSING:
+ if (method->isA<ChannelCloseOkBody>()) {
+ setState(CLOSED);
+ } //else just ignore it
+ break;
+ case CLOSED:
+ throw ConnectionException(504, "Channel not opened.");
+ default:
+ throw Exception("Unexpected state encountered in ChannelHandler!");
+ }
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h Thu Aug 2 11:09:48 2007
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ChannelHandler_
+#define _ChannelHandler_
+
+#include "StateManager.h"
+#include "ChainableFrameHandler.h"
+#include "qpid/framing/amqp_framing.h"
+
+namespace qpid {
+namespace client {
+
+class ChannelHandler : private StateManager, public ChainableFrameHandler
+{
+ enum STATES {OPENING, OPEN, CLOSING, CLOSED};
+ framing::ProtocolVersion version;
+ uint16_t id;
+
+ void handleMethod(framing::AMQMethodBody::shared_ptr method);
+
+ template <class T> bool isA(framing::AMQBody::shared_ptr body) {
+ return body->type() == framing::METHOD_BODY &&
+ boost::shared_polymorphic_cast<framing::AMQMethodBody>(body)->isA<T>();
+ }
+
+
+ void close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId);
+
+
+public:
+ typedef boost::function<void(uint16_t, const std::string&)> CloseListener;
+
+ ChannelHandler();
+
+ void incoming(framing::AMQFrame& frame);
+ void outgoing(framing::AMQFrame& frame);
+
+ void open(uint16_t id);
+ void close();
+
+ CloseListener onClose;
+};
+
+}}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Thu Aug 2 11:09:48 2007
@@ -24,9 +24,12 @@
#include "qpid/sys/Monitor.h"
#include "ClientMessage.h"
#include "qpid/QpidError.h"
-#include "MethodBodyInstances.h"
#include "Connection.h"
-#include "BasicMessageChannel.h"
+#include "ConnectionHandler.h"
+#include "FutureResponse.h"
+#include "MessageListener.h"
+#include <boost/format.hpp>
+#include <boost/bind.hpp>
// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
// handling of errors that should close the connection or the channel.
@@ -45,18 +48,13 @@
}}
-Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
+Channel::Channel(bool _transactional, u_int16_t _prefetch) :
connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false)
{
- switch (mode) {
- case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break;
- default: assert(0); QPID_ERROR(INTERNAL_ERROR, "Invalid interop-mode.");
- }
}
Channel::~Channel(){
closeInternal();
- stop();
}
void Channel::open(ChannelId id, Connection& con)
@@ -64,65 +62,15 @@
if (isOpen())
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id);
connection = &con;
- init(id, con, con.getVersion()); // ChannelAdapter initialization.
- string oob;
- if (id != 0)
- sendAndReceive<ChannelOpenOkBody>(make_shared_ptr(new ChannelOpenBody(version, oob)));
-}
-
-void Channel::protocolInit(
- const std::string& uid, const std::string& pwd, const std::string& vhost) {
- assert(connection);
- responses.expect();
- connection->connector->init(); // Send ProtocolInit block.
- ConnectionStartBody::shared_ptr connectionStart =
- responses.receive<ConnectionStartBody>();
-
- FieldTable props;
- string mechanism("PLAIN");
- string response = ((char)0) + uid + ((char)0) + pwd;
- string locale("en_US");
- ConnectionTuneBody::shared_ptr proposal =
- sendAndReceive<ConnectionTuneBody>(
- make_shared_ptr(new ConnectionStartOkBody(
- version, //connectionStart->getRequestId(),
- props, mechanism,
- response, locale)));
-
- /**
- * Assume for now that further challenges will not be required
- //receive connection.secure
- responses.receive(connection_secure));
- //send connection.secure-ok
- connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
- **/
-
- sendCommand(make_shared_ptr(new ConnectionTuneOkBody(
- version, //proposal->getRequestId(),
- proposal->getChannelMax(), connection->getMaxFrameSize(),
- proposal->getHeartbeat())));
-
- uint16_t heartbeat = proposal->getHeartbeat();
- connection->connector->setReadTimeout(heartbeat * 2);
- connection->connector->setWriteTimeout(heartbeat);
-
- // Send connection open.
- std::string capabilities;
- responses.expect();
- sendCommand(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, true)));
- //receive connection.open-ok (or redirect, but ignore that for now
- //esp. as using force=true).
- AMQMethodBody::shared_ptr openResponse = responses.receive();
- if(openResponse->isA<ConnectionOpenOkBody>()) {
- //ok
- }else if(openResponse->isA<ConnectionRedirectBody>()){
- //ignore for now
- ConnectionRedirectBody::shared_ptr redirect(
- shared_polymorphic_downcast<ConnectionRedirectBody>(openResponse));
- QPID_LOG(error, "Ignoring redirect to " << redirect->getHost());
- } else {
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response to Connection.open");
- }
+ channelId = id;
+ //link up handlers:
+ channelHandler.out = boost::bind(&ConnectionHandler::outgoing, &(connection->handler), _1);
+ channelHandler.in = boost::bind(&ExecutionHandler::handle, &executionHandler, _1);
+ executionHandler.out = boost::bind(&ChannelHandler::outgoing, &channelHandler, _1);
+ //set up close notification:
+ channelHandler.onClose = boost::bind(&Channel::peerClose, this, _1, _2);
+
+ channelHandler.open(id);
}
bool Channel::isOpen() const {
@@ -131,7 +79,11 @@
}
void Channel::setQos() {
- messaging->setQos();
+ executionHandler.send(make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false)));
+ if(isTransactional()) {
+ //I think this is wrong! should only send TxSelect once...
+ executionHandler.send(make_shared_ptr(new TxSelectBody(version)));
+ }
}
void Channel::setPrefetch(uint16_t _prefetch){
@@ -143,14 +95,12 @@
string name = exchange.getName();
string type = exchange.getType();
FieldTable args;
- send(make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args)));
- if (synch) synchWithServer();
+ sendSync(synch, make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args)));
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
- send(make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false)));
- if (synch) synchWithServer();
+ sendSync(synch, make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false)));
}
void Channel::declareQueue(Queue& queue, bool synch){
@@ -179,131 +129,41 @@
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- send(make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args)));
- if (synch) synchWithServer();
+ sendSync(synch, make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args)));
}
void Channel::commit(){
- send(make_shared_ptr(new TxCommitBody(version)));
+ executionHandler.send(make_shared_ptr(new TxCommitBody(version)));
}
void Channel::rollback(){
- send(make_shared_ptr(new TxRollbackBody(version)));
-}
-
-void Channel::handleMethodInContext(
-AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
-{
- // Special case for consume OK as it is both an expected response
- // and needs handling in this thread.
- if (method->isA<BasicConsumeOkBody>()) {
- messaging->handle(method);
- responses.signalResponse(method);
- return;
- }
- if(responses.isWaiting()) {
- responses.signalResponse(method);
- return;
- }
- try {
- switch (method->amqpClassId()) {
- case MessageTransferBody::CLASS_ID:
- case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
- case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break;
- case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
- case ExecutionCompleteBody::CLASS_ID: handleExecution(method); break;
- default: throw UnknownMethod();
- }
- }
- catch (const UnknownMethod&) {
- connection->close(
- 504, "Unknown method",
- method->amqpClassId(), method->amqpMethodId());
- }
- }
-
-void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& /*ctxt*/) {
- switch (method->amqpMethodId()) {
- case ChannelCloseBody::METHOD_ID:
- sendCommand(make_shared_ptr(new ChannelCloseOkBody(version/*, ctxt.getRequestId()*/)));
- peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
- return;
- case ChannelFlowBody::METHOD_ID:
- // FIXME aconway 2007-02-22: Not yet implemented.
- return;
- }
- throw UnknownMethod();
-}
-
-void Channel::handleConnection(AMQMethodBody::shared_ptr method) {
- if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) {
- connection->close();
- return;
- }
- throw UnknownMethod();
-}
-
-void Channel::handleExecution(AMQMethodBody::shared_ptr method) {
- if (method->amqpMethodId() == ExecutionCompleteBody::METHOD_ID) {
- Monitor::ScopedLock l(outgoingMonitor);
- //record the completion mark:
- outgoing.lwm = shared_polymorphic_downcast<ExecutionCompleteBody>(method)->getCumulativeExecutionMark();
- //TODO: notify anyone waiting for completion notification:
- outgoingMonitor.notifyAll();
- } else{
- throw UnknownMethod();
- }
-}
-
-void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
- messaging->handle(body);
-}
-
-void Channel::handleContent(AMQContentBody::shared_ptr body){
- messaging->handle(body);
-}
-
-void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
+ executionHandler.send(make_shared_ptr(new TxRollbackBody(version)));
}
-void Channel::start(){
- running = true;
- dispatcher = Thread(*messaging);
-}
-
-// Close called by local application.
-void Channel::close(
- uint16_t code, const std::string& text,
- ClassId classId, MethodId methodId)
+void Channel::close()
{
- if (isOpen()) {
- try {
- if (getId() != 0) {
- if (code == 200) messaging->cancelAll();
-
- sendAndReceive<ChannelCloseOkBody>(
- make_shared_ptr(new ChannelCloseBody(
- version, code, text, classId, methodId)));
- }
- static_cast<ConnectionForChannel*>(connection)->erase(getId());
- closeInternal();
- } catch (...) {
- static_cast<ConnectionForChannel*>(connection)->erase(getId());
- closeInternal();
- throw;
+ channelHandler.close();
+ {
+ Mutex::ScopedLock l(lock);
+ if (connection);
+ {
+ connection->erase(channelId);
+ connection = 0;
}
}
stop();
}
+
// Channel closed by peer.
-void Channel::peerClose(ChannelCloseBody::shared_ptr reason) {
+void Channel::peerClose(uint16_t code, const std::string& message) {
assert(isOpen());
//record reason:
- errorCode = reason->getReplyCode();
- errorText = reason->getReplyText();
+ errorCode = code;
+ errorText = message;
closeInternal();
+ stop();
+ futures.close(code, message);
}
void Channel::closeInternal() {
@@ -311,26 +171,26 @@
if (connection);
{
connection = 0;
- messaging->close();
- // A 0 response means we are closed.
- responses.signalResponse(AMQMethodBody::shared_ptr());
}
}
-void Channel::stop() {
- Mutex::ScopedLock l(stopLock);
- if(running) {
- dispatcher.join();
- running = false;
- }
+AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr toSend, ClassId /*c*/, MethodId /*m*/)
+{
+
+ boost::shared_ptr<FutureResponse> fr(futures.createResponse());
+ executionHandler.send(toSend, boost::bind(&FutureResponse::completed, fr), boost::bind(&FutureResponse::received, fr, _1));
+ return fr->getResponse();
}
-AMQMethodBody::shared_ptr Channel::sendAndReceive(
- AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m)
+void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command)
{
- responses.expect();
- sendCommand(toSend);
- return responses.receive(c, m);
+ if(sync) {
+ boost::shared_ptr<FutureCompletion> fc(futures.createCompletion());
+ executionHandler.send(command, boost::bind(&FutureCompletion::completed, fc));
+ fc->waitForCompletion();
+ } else {
+ executionHandler.send(command);
+ }
}
AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
@@ -339,68 +199,138 @@
if(sync)
return sendAndReceive(body, c, m);
else {
- sendCommand(body);
+ executionHandler.send(body);
return AMQMethodBody::shared_ptr();
}
}
void Channel::consume(
- Queue& queue, std::string& tag, MessageListener* listener,
+ Queue& queue, const std::string& tag, MessageListener* listener,
AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
- messaging->consume(queue, tag, listener, ackMode, noLocal, synch, fields);
+
+ if (tag.empty()) {
+ throw Exception("A tag must be specified for a consumer.");
+ }
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag);
+ Consumer& c = consumers[tag];
+ c.listener = listener;
+ c.ackMode = ackMode;
+ c.lastDeliveryTag = 0;
+ }
+ sendAndReceiveSync<BasicConsumeOkBody>(
+ synch,
+ make_shared_ptr(new BasicConsumeBody(
+ version, 0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable())));
}
void Channel::cancel(const std::string& tag, bool synch) {
- messaging->cancel(tag, synch);
+ Consumer c;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i == consumers.end())
+ return;
+ c = i->second;
+ consumers.erase(i);
+ }
+ sendAndReceiveSync<BasicCancelOkBody>(
+ synch, make_shared_ptr(new BasicCancelBody(version, tag, !synch)));
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
- bool result = messaging->get(msg, queue, ackMode);
- if (!isOpen()) {
- throw ChannelException(errorCode, errorText);
+
+ AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, queue.getName(), ackMode));
+ AMQMethodBody::shared_ptr response = sendAndReceive(request);
+ if (response && response->isA<BasicGetEmptyBody>()) {
+ return false;
+ } else {
+ ReceivedContent::shared_ptr content = gets.pop();
+ content->populate(msg);
+ return true;
}
- return result;
}
void Channel::publish(const Message& msg, const Exchange& exchange,
const std::string& routingKey,
bool mandatory, bool immediate) {
- messaging->publish(msg, exchange, routingKey, mandatory, immediate);
-}
-void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler) {
- messaging->setReturnedMessageHandler(handler);
-}
+ const string e = exchange.getName();
+ string key = routingKey;
-void Channel::run() {
- messaging->run();
+ executionHandler.sendContent(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)),
+ msg, msg.getData(), connection->getMaxFrameSize());//sending framesize here is horrible, fix this!
+ /*
+ // Make a header for the message
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ BasicHeaderProperties::copy(
+ *static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
+ header->setContentSize(msg.getData().size());
+
+ executionHandler.send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
+ executionHandler.sendContent(header);
+ string data = msg.getData();
+ u_int64_t data_length = data.length();
+ if(data_length > 0){
+ //frame itself uses 8 bytes
+ u_int32_t frag_size = connection->getMaxFrameSize() - 8;
+ if(data_length < frag_size){
+ executionHandler.sendContent(make_shared_ptr(new AMQContentBody(data)));
+ }else{
+ u_int32_t offset = 0;
+ u_int32_t remaining = data_length - offset;
+ while (remaining > 0) {
+ u_int32_t length = remaining > frag_size ? frag_size : remaining;
+ string frag(data.substr(offset, length));
+ executionHandler.sendContent(make_shared_ptr(new AMQContentBody(frag)));
+
+ offset += length;
+ remaining = data_length - offset;
+ }
+ }
+ }
+ */
}
-void Channel::sendCommand(AMQBody::shared_ptr body)
-{
- ++(outgoing.hwm);
- send(body);
+void Channel::start(){
+ running = true;
+ dispatcher = Thread(*this);
}
-bool Channel::waitForCompletion(SequenceNumber poi, Duration timeout)
-{
- AbsTime end;
- if (timeout == 0) {
- end = AbsTime::FarFuture();
- } else {
- end = AbsTime(AbsTime::now(), timeout);
- }
-
- Monitor::ScopedLock l(outgoingMonitor);
- while (end > AbsTime::now() && outgoing.lwm < poi) {
- outgoingMonitor.wait(end);
+void Channel::stop() {
+ executionHandler.received.close();
+ gets.close();
+ Mutex::ScopedLock l(stopLock);
+ if(running) {
+ dispatcher.join();
+ running = false;
}
- return !(outgoing.lwm < poi);
}
-bool Channel::synchWithServer(Duration timeout)
-{
- send(make_shared_ptr(new ExecutionFlushBody(version)));
- return waitForCompletion(outgoing.hwm, timeout);
+void Channel::run() {
+ try {
+ while (true) {
+ ReceivedContent::shared_ptr content = executionHandler.received.pop();
+ //need to dispatch this to the relevant listener:
+ if (content->isA<BasicDeliverBody>()) {
+ ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
+ if (i != consumers.end()) {
+ Message msg;
+ content->populate(msg);
+ i->second.listener->received(msg);
+ } else {
+ QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod());
+ }
+ } else if (content->isA<BasicGetOkBody>()) {
+ gets.push(content);
+ } else {
+ QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod());
+ }
+ }
+ } catch (const QueueClosed&) {}
}
-
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Thu Aug 2 11:09:48 2007
@@ -26,10 +26,12 @@
#include "ClientExchange.h"
#include "ClientMessage.h"
#include "ClientQueue.h"
-#include "ResponseHandler.h"
+#include "ChannelHandler.h"
+#include "ExecutionHandler.h"
+#include "FutureFactory.h"
#include "qpid/Exception.h"
-#include "qpid/framing/ChannelAdapter.h"
-#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "AckMode.h"
@@ -54,19 +56,23 @@
*
* \ingroup clientapi
*/
-class Channel : public framing::ChannelAdapter
+class Channel : private sys::Runnable
{
private:
struct UnknownMethod {};
typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
+
+ struct Consumer{
+ MessageListener* listener;
+ AckMode ackMode;
+ int count;
+ u_int64_t lastDeliveryTag;
+ };
+ typedef std::map<std::string, Consumer> ConsumerMap;
mutable sys::Mutex lock;
- boost::scoped_ptr<MessageChannel> messaging;
Connection* connection;
sys::Thread dispatcher;
- ResponseHandler responses;
- sys::Monitor outgoingMonitor;
- framing::Window outgoing;
uint16_t prefetch;
const bool transactional;
@@ -78,32 +84,29 @@
sys::Mutex stopLock;
bool running;
- void stop();
+ ConsumerMap consumers;
+ ExecutionHandler executionHandler;
+ ChannelHandler channelHandler;
+ framing::ChannelId channelId;
+ BlockingQueue<ReceivedContent::shared_ptr> gets;
+ FutureFactory futures;
- void handleHeader(framing::AMQHeaderBody::shared_ptr body);
- void handleContent(framing::AMQContentBody::shared_ptr body);
- void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
- void handleMethodInContext(
- framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
- void handleChannel(framing::AMQMethodBody::shared_ptr method, const framing::MethodContext& ctxt);
- void handleConnection(framing::AMQMethodBody::shared_ptr method);
- void handleExecution(framing::AMQMethodBody::shared_ptr method);
+ void stop();
void setQos();
-
- void protocolInit(
- const std::string& uid, const std::string& pwd,
- const std::string& vhost);
framing::AMQMethodBody::shared_ptr sendAndReceive(
framing::AMQMethodBody::shared_ptr,
- framing::ClassId, framing::MethodId);
+ framing::ClassId = 0, framing::MethodId = 0);
framing::AMQMethodBody::shared_ptr sendAndReceiveSync(
bool sync,
framing::AMQMethodBody::shared_ptr,
framing::ClassId, framing::MethodId);
+ void sendSync(bool sync, framing::AMQMethodBody::shared_ptr body);
+
+
template <class BodyType>
boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) {
return boost::shared_polymorphic_downcast<BodyType>(
@@ -118,21 +121,16 @@
sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
}
- void sendCommand(framing::AMQBody::shared_ptr body);
-
void open(framing::ChannelId, Connection&);
void closeInternal();
- void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
- bool waitForCompletion(framing::SequenceNumber, sys::Duration);
-
+ void peerClose(uint16_t, const std::string&);
+
// FIXME aconway 2007-02-23: Get rid of friendships.
- friend class Connection;
- friend class BasicMessageChannel; // for sendAndReceive.
- friend class MessageMessageChannel; // for sendAndReceive.
+ friend class Connection;
+ friend class BasicMessageChannel; // for sendAndReceive.
+ friend class MessageMessageChannel; // for sendAndReceive.
public:
- enum InteropMode { AMQP_08, AMQP_09 };
-
/**
* Creates a channel object.
*
@@ -143,16 +141,10 @@
* @param prefetch specifies the number of unacknowledged
* messages the channel is willing to have sent to it
* asynchronously
- *
- * @param messageImpl Alternate messaging implementation class to
- * allow alternate protocol implementations of messaging
- * operations. Takes ownership.
- */
- Channel(
- bool transactional = false, u_int16_t prefetch = 500,
- InteropMode=AMQP_08);
+ */
+ Channel(bool transactional = false, u_int16_t prefetch = 500);
- ~Channel();
+ ~Channel();
/**
* Declares an exchange.
@@ -254,12 +246,10 @@
void start();
/**
- * Close the channel with optional error information.
- * Closing a channel that is not open has no effect.
+ * Close the channel. Closing a channel that is not open has no
+ * effect.
*/
- void close(
- framing::ReplyCode = 200, const std::string& ="OK",
- framing::ClassId = 0, framing::MethodId = 0);
+ void close();
/** True if the channel is transactional */
bool isTransactional() { return transactional; }
@@ -301,7 +291,7 @@
* is received from the broker
*/
void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
+ Queue& queue, const std::string& tag, MessageListener* listener,
AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
const framing::FieldTable* fields = 0);
@@ -353,22 +343,9 @@
bool mandatory = false, bool immediate = false);
/**
- * Set a handler for this channel that will process any
- * returned messages
- *
- * @see publish()
- */
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
-
- /**
- * Deliver messages from the broker to the appropriate MessageListener.
+ * Deliver incoming messages to the appropriate MessageListener.
*/
void run();
-
- /**
- * TESTING ONLY FOR NOW!
- */
- bool synchWithServer(sys::Duration timeout = 0);
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp Thu Aug 2 11:09:48 2007
@@ -46,11 +46,13 @@
Connection::Connection(
bool _debug, uint32_t _max_frame_size,
framing::ProtocolVersion _version
-) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size),
+ ) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size),
defaultConnector(version, _debug, _max_frame_size),
isOpen(false), debug(_debug)
{
setConnector(defaultConnector);
+
+ handler.maxFrameSize = _max_frame_size;
}
Connection::~Connection(){}
@@ -58,7 +60,7 @@
void Connection::setConnector(Connector& con)
{
connector = &con;
- connector->setInputHandler(this);
+ connector->setInputHandler(&handler);
connector->setTimeoutHandler(this);
connector->setShutdownHandler(this);
out = connector->getOutputHandler();
@@ -70,10 +72,19 @@
{
if (isOpen)
THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
+
+ //wire up the handler:
+ handler.in = boost::bind(&Connection::received, this, _1);
+ handler.out = boost::bind(&Connector::send, connector, _1);
+ handler.onClose = boost::bind(&Connection::closeChannels, this);
+
+ handler.uid = uid;
+ handler.pwd = pwd;
+ handler.vhost = vhost;
+
connector->connect(host, port);
- channels[0] = &channel0;
- channel0.open(0, *this);
- channel0.protocolInit(uid, pwd, vhost);
+ connector->init();
+ handler.waitForOpen();
isOpen = true;
}
@@ -87,14 +98,12 @@
}
void Connection::close(
- ReplyCode code, const string& msg, ClassId classId, MethodId methodId
+ ReplyCode /*code*/, const string& /*msg*/, ClassId /*classId*/, MethodId /*methodId*/
)
{
if(markClosed()) {
try {
- channel0.sendAndReceive<ConnectionCloseOkBody>(
- make_shared_ptr(new ConnectionCloseBody(
- getVersion(), code, msg, classId, methodId)));
+ handler.close();
} catch (const std::exception& e) {
QPID_LOG(error, "Exception closing channel: " << e.what());
}
@@ -138,33 +147,14 @@
void Connection::received(AMQFrame& frame){
ChannelId id = frame.getChannel();
Channel* channel = channels[id];
- if (channel == 0)
- THROW_QPID_ERROR(
- PROTOCOL_ERROR+504,
- (boost::format("Invalid channel number %g") % id).str());
- try{
- channel->getHandlers().in->handle(frame);
- }catch(const qpid::QpidError& e){
- std::cout << "Caught error while handling " << frame << ": " << e.what() <<std::endl;
- channelException(
- *channel, dynamic_cast<AMQMethodBody*>(frame.getBody().get()), e);
+ if (channel == 0) {
+ throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
}
+ channel->channelHandler.incoming(frame);
}
void Connection::send(AMQFrame& frame) {
out->send(frame);
-}
-
-void Connection::channelException(
- Channel& channel, AMQMethodBody* method, const QpidError& e)
-{
- int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500;
- string msg = e.msg;
- if(method == 0)
- channel.close(code, msg);
- else
- channel.close(
- code, msg, method->amqpClassId(), method->amqpMethodId());
}
void Connection::idleIn(){
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp Thu Aug 2 11:09:48 2007
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "CompletionTracker.h"
+
+using qpid::client::CompletionTracker;
+using namespace qpid::framing;
+using namespace boost;
+
+CompletionTracker::CompletionTracker() {}
+CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {}
+
+
+void CompletionTracker::completed(const SequenceNumber& _mark)
+{
+ sys::Mutex::ScopedLock l(lock);
+ mark = _mark;
+ while (!listeners.empty() && !(listeners.front().first > mark)) {
+ Listener f(listeners.front().second);
+ {
+ sys::Mutex::ScopedUnlock u(lock);
+ f();
+ }
+ listeners.pop();
+ }
+}
+
+void CompletionTracker::listen(const SequenceNumber& point, Listener listener)
+{
+ if (!add(point, listener)) {
+ listener();
+ }
+}
+
+bool CompletionTracker::add(const SequenceNumber& point, Listener listener)
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (point < mark) {
+ return false;
+ } else {
+ listeners.push(make_pair(point, listener));
+ return true;
+ }
+}
+
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h Thu Aug 2 11:09:48 2007
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <queue>
+#include <boost/function.hpp>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Mutex.h"
+
+#ifndef _CompletionTracker_
+#define _CompletionTracker_
+
+namespace qpid {
+namespace client {
+
+class CompletionTracker
+{
+public:
+ typedef boost::function<void()> Listener;
+
+ CompletionTracker();
+ CompletionTracker(const framing::SequenceNumber& mark);
+ void completed(const framing::SequenceNumber& mark);
+ void listen(const framing::SequenceNumber& point, Listener l);
+
+private:
+ sys::Mutex lock;
+ framing::SequenceNumber mark;
+ std::queue< std::pair<framing::SequenceNumber, Listener> > listeners;
+
+ bool add(const framing::SequenceNumber& point, Listener l);
+};
+
+}
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?view=diff&rev=562212&r1=562211&r2=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Thu Aug 2 11:09:48 2007
@@ -26,6 +26,7 @@
#include "qpid/QpidError.h"
#include "ClientChannel.h"
#include "Connector.h"
+#include "ConnectionHandler.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/ShutdownHandler.h"
#include "qpid/sys/TimeoutHandler.h"
@@ -79,17 +80,15 @@
framing::ProtocolVersion version;
const uint32_t max_frame_size;
ChannelMap channels;
+ ConnectionHandler handler;
Connector defaultConnector;
Connector* connector;
framing::OutputHandler* out;
bool isOpen;
sys::Mutex shutdownLock;
- Channel channel0;
bool debug;
void erase(framing::ChannelId);
- void channelException(
- Channel&, framing::AMQMethodBody*, const QpidError&);
void closeChannels();
bool markClosed();
@@ -174,7 +173,7 @@
inline uint32_t getMaxFrameSize(){ return max_frame_size; }
/** @return protocol version in use on this connection. */
- framing::ProtocolVersion getVersion() const { return version; }
+ //framing::ProtocolVersion getVersion() const { return version; }
};
}} // namespace qpid::client
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Thu Aug 2 11:09:48 2007
@@ -0,0 +1,196 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionHandler.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/amqp_framing.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace boost;
+
+namespace {
+const std::string OK("OK");
+}
+
+ConnectionHandler::ConnectionHandler()
+ : StateManager(NOT_STARTED)
+{
+
+ mechanism = "PLAIN";
+ locale = "en_US";
+ heartbeat = 0;
+ maxChannels = 32767;
+ maxFrameSize = 65536;
+ insist = true;
+ version = framing::highestProtocolVersion;
+
+ ESTABLISHED.insert(FAILED);
+ ESTABLISHED.insert(OPEN);
+}
+
+void ConnectionHandler::incoming(AMQFrame& frame)
+{
+ if (getState() == CLOSED) {
+ throw Exception("Connection is closed.");
+ }
+
+ AMQBody::shared_ptr body = frame.getBody();
+ if (frame.getChannel() == 0) {
+ if (body->type() == METHOD_BODY) {
+ handle(shared_polymorphic_cast<AMQMethodBody>(body));
+ } else {
+ error(503, "Cannot send content on channel zero.");
+ }
+ } else {
+ switch(getState()) {
+ case OPEN:
+ try {
+ in(frame);
+ }catch(ConnectionException& e){
+ error(e.code, e.toString(), body);
+ }catch(std::exception& e){
+ error(541/*internal error*/, e.what(), body);
+ }
+ break;
+ case CLOSING:
+ QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored.");
+ break;
+ default:
+ //must be in connection initialisation:
+ fail("Cannot receive frames on non-zero channel until connection is established.");
+ }
+ }
+}
+
+void ConnectionHandler::outgoing(AMQFrame& frame)
+{
+ if (getState() == OPEN) {
+ out(frame);
+ } else {
+ throw Exception("Connection is not open.");
+ }
+}
+
+void ConnectionHandler::waitForOpen()
+{
+ waitFor(ESTABLISHED);
+ if (getState() == FAILED) {
+ throw Exception("Failed to establish connection.");
+ }
+}
+
+void ConnectionHandler::close()
+{
+ setState(CLOSING);
+ send(make_shared_ptr(new ConnectionCloseBody(version, 200, OK, 0, 0)));
+
+ waitFor(CLOSED);
+}
+
+void ConnectionHandler::send(framing::AMQBody::shared_ptr body)
+{
+ AMQFrame f;
+ f.setBody(body);
+ out(f);
+}
+
+void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
+{
+ setState(CLOSING);
+ send(make_shared_ptr(new ConnectionCloseBody(version, code, message, classId, methodId)));
+}
+
+void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody::shared_ptr body)
+{
+ if (body->type() == METHOD_BODY) {
+ AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body));
+ error(code, message, method->amqpClassId(), method->amqpMethodId());
+ } else {
+ error(code, message);
+ }
+}
+
+
+void ConnectionHandler::fail(const std::string& message)
+{
+ QPID_LOG(error, message);
+ setState(FAILED);
+}
+
+void ConnectionHandler::handle(AMQMethodBody::shared_ptr method)
+{
+ switch (getState()) {
+ case NOT_STARTED:
+ if (method->isA<ConnectionStartBody>()) {
+ setState(NEGOTIATING);
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ send(make_shared_ptr(new ConnectionStartOkBody(version, properties, mechanism, response, locale)));
+ } else {
+ fail("Bad method sequence, expected connection-start.");
+ }
+ break;
+ case NEGOTIATING:
+ if (method->isA<ConnectionTuneBody>()) {
+ ConnectionTuneBody::shared_ptr proposal(shared_polymorphic_cast<ConnectionTuneBody>(method));
+ heartbeat = proposal->getHeartbeat();
+ maxChannels = proposal->getChannelMax();
+ send(make_shared_ptr(new ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat)));
+ setState(OPENING);
+ send(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, insist)));
+ //TODO: support for further security challenges
+ //} else if (method->isA<ConnectionSecureBody>()) {
+ } else {
+ fail("Unexpected method sequence, expected connection-tune.");
+ }
+ break;
+ case OPENING:
+ if (method->isA<ConnectionOpenOkBody>()) {
+ setState(OPEN);
+ //TODO: support for redirection
+ //} else if (method->isA<ConnectionRedirectBody>()) {
+ } else {
+ fail("Unexpected method sequence, expected connection-open-ok.");
+ }
+ break;
+ case OPEN:
+ if (method->isA<ConnectionCloseBody>()) {
+ send(make_shared_ptr(new ConnectionCloseOkBody(version)));
+ setState(CLOSED);
+ if (onClose) {
+ onClose();
+ }
+ } else {
+ error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId());
+ }
+ break;
+ case CLOSING:
+ if (method->isA<ConnectionCloseOkBody>()) {
+ setState(CLOSED);
+ if (onClose) {
+ onClose();
+ }
+ } else {
+ QPID_LOG(warning, "Received frame on channel zero while closing connection; frame ignored.");
+ }
+ break;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h Thu Aug 2 11:09:48 2007
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConnectionHandler_
+#define _ConnectionHandler_
+
+#include "Connector.h"
+#include "StateManager.h"
+#include "ChainableFrameHandler.h"
+#include "qpid/framing/InputHandler.h"
+
+namespace qpid {
+namespace client {
+
+struct ConnectionProperties
+{
+ std::string uid;
+ std::string pwd;
+ std::string vhost;
+ framing::FieldTable properties;
+ std::string mechanism;
+ std::string locale;
+ std::string capabilities;
+ uint16_t heartbeat;
+ uint16_t maxChannels;
+ uint64_t maxFrameSize;
+ bool insist;
+ framing::ProtocolVersion version;
+};
+
+class ConnectionHandler : private StateManager,
+ public ConnectionProperties,
+ public ChainableFrameHandler,
+ public framing::InputHandler
+{
+ enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
+ std::set<int> ESTABLISHED;
+
+ void handle(framing::AMQMethodBody::shared_ptr method);
+ void send(framing::AMQBody::shared_ptr body);
+ void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0);
+ void error(uint16_t code, const std::string& message, framing::AMQBody::shared_ptr body);
+ void fail(const std::string& message);
+
+public:
+ typedef boost::function<void()> CloseListener;
+
+ ConnectionHandler();
+
+ void received(framing::AMQFrame& f) { incoming(f); }
+
+ void incoming(framing::AMQFrame& frame);
+ void outgoing(framing::AMQFrame& frame);
+
+ void waitForOpen();
+ void close();
+
+ CloseListener onClose;
+};
+
+}}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp Thu Aug 2 11:09:48 2007
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Correlator.h"
+
+using qpid::client::Correlator;
+using namespace qpid::framing;
+using namespace boost;
+
+void Correlator::receive(AMQMethodBody::shared_ptr response)
+{
+ if (listeners.empty()) {
+ throw ConnectionException(503, "Unexpected method!");//TODO: include the method & class name
+ } else {
+ Listener l = listeners.front();
+ if (l) l(response);
+ listeners.pop();
+ }
+}
+
+void Correlator::listen(Listener l)
+{
+ listeners.push(l);
+}
+
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h Thu Aug 2 11:09:48 2007
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <queue>
+#include <set>
+#include <boost/function.hpp>
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/sys/Monitor.h"
+
+#ifndef _Correlator_
+#define _Correlator_
+
+namespace qpid {
+namespace client {
+
+
+class Correlator
+{
+public:
+ typedef boost::function<void(framing::AMQMethodBody::shared_ptr)> Listener;
+
+ void receive(framing::AMQMethodBody::shared_ptr);
+ void listen(Listener l);
+
+private:
+ std::queue<Listener> listeners;
+};
+
+}
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Thu Aug 2 11:09:48 2007
@@ -0,0 +1,159 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ExecutionHandler.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/BasicDeliverBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace boost;
+
+bool isMessageMethod(AMQMethodBody::shared_ptr method)
+{
+ return method->isA<BasicDeliverBody>() || method->isA<MessageTransferBody>() || method->isA<BasicGetOkBody>();
+}
+
+bool isMessageMethod(AMQBody::shared_ptr body)
+{
+ return body->type() == METHOD_BODY && isMessageMethod(shared_polymorphic_cast<AMQMethodBody>(body));
+}
+
+bool isContentFrame(AMQFrame& frame)
+{
+ AMQBody::shared_ptr body = frame.getBody();
+ uint8_t type = body->type();
+ return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body);
+}
+
+bool invoke(AMQBody::shared_ptr body, Invocable* target)
+{
+ return body->type() == METHOD_BODY && shared_polymorphic_cast<AMQMethodBody>(body)->invoke(target);
+}
+
+ExecutionHandler::ExecutionHandler() : version(framing::highestProtocolVersion) {}
+
+//incoming:
+void ExecutionHandler::handle(AMQFrame& frame)
+{
+ AMQBody::shared_ptr body = frame.getBody();
+ if (!invoke(body, this)) {
+ if (isContentFrame(frame)) {
+ if (!arriving) {
+ arriving = ReceivedContent::shared_ptr(new ReceivedContent(++incoming.hwm));
+ }
+ arriving->append(body);
+ if (arriving->isComplete()) {
+ received.push(arriving);
+ arriving.reset();
+ }
+ } else {
+ ++incoming.hwm;
+ correlation.receive(shared_polymorphic_cast<AMQMethodBody>(body));
+ }
+ }
+}
+
+void ExecutionHandler::complete(uint32_t cumulative, SequenceNumberSet range)
+{
+ SequenceNumber mark(cumulative);
+ if (outgoing.lwm < mark) {
+ outgoing.lwm = mark;
+ completion.completed(outgoing.lwm);
+ }
+ if (range.size() % 2) { //must be even number
+ throw ConnectionException(530, "Received odd number of elements in ranged mark");
+ } else {
+ //TODO: need to manage (record and accumulate) ranges such
+ //that we can implictly move the mark when appropriate
+
+ //TODO: signal listeners of early notification?
+ }
+}
+
+void ExecutionHandler::flush()
+{
+ //send completion
+ incoming.lwm = incoming.hwm;
+ //make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())));
+}
+
+void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::Listener f, Correlator::Listener g)
+{
+ //allocate id:
+ ++outgoing.hwm;
+ //register listeners if necessary:
+ if (f) {
+ completion.listen(outgoing.hwm, f);
+ }
+ if (g) {
+ correlation.listen(g);
+ }
+
+ AMQFrame frame(version, 0/*id will be filled in be channel handler*/, command);
+ out(frame);
+
+ if (f) {
+ AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version)));
+ out(frame);
+ }
+}
+
+void ExecutionHandler::sendContent(framing::AMQBody::shared_ptr content)
+{
+ AMQFrame frame(version, 0/*id will be filled in be channel handler*/, content);
+ out(frame);
+}
+
+void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeaderProperties& headers, const std::string& data,
+ uint64_t frameSize,
+ CompletionTracker::Listener f, Correlator::Listener g)
+{
+ send(command, f, g);
+
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header->getProperties()), headers);
+ header->setContentSize(data.size());
+ AMQFrame h(version, 0, header);
+ out(h);
+
+ u_int64_t data_length = data.length();
+ if(data_length > 0){
+ //frame itself uses 8 bytes
+ u_int32_t frag_size = frameSize - 8;
+ if(data_length < frag_size){
+ AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(data)));
+ out(frame);
+ }else{
+ u_int32_t offset = 0;
+ u_int32_t remaining = data_length - offset;
+ while (remaining > 0) {
+ u_int32_t length = remaining > frag_size ? frag_size : remaining;
+ string frag(data.substr(offset, length));
+ AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(frag)));
+ out(frame);
+ offset += length;
+ remaining = data_length - offset;
+ }
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Thu Aug 2 11:09:48 2007
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ExecutionHandler_
+#define _ExecutionHandler_
+
+#include <queue>
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "BlockingQueue.h"
+#include "ChainableFrameHandler.h"
+#include "CompletionTracker.h"
+#include "Correlator.h"
+#include "ReceivedContent.h"
+
+namespace qpid {
+namespace client {
+
+class ExecutionHandler :
+ private framing::AMQP_ServerOperations::ExecutionHandler,
+ public ChainableFrameHandler
+{
+ framing::Window incoming;
+ framing::Window outgoing;
+ ReceivedContent::shared_ptr arriving;
+ Correlator correlation;
+ CompletionTracker completion;
+ framing::ProtocolVersion version;
+
+ void complete(uint32_t mark, framing::SequenceNumberSet range);
+ void flush();
+
+public:
+ BlockingQueue<ReceivedContent::shared_ptr> received;
+
+ ExecutionHandler();
+
+ void handle(framing::AMQFrame& frame);
+ void send(framing::AMQBody::shared_ptr command,
+ CompletionTracker::Listener f = CompletionTracker::Listener(),
+ Correlator::Listener g = Correlator::Listener());
+ void sendContent(framing::AMQBody::shared_ptr command,
+ const framing::BasicHeaderProperties& headers, const std::string& data,
+ uint64_t frameSize,
+ CompletionTracker::Listener f = CompletionTracker::Listener(),
+ Correlator::Listener g = Correlator::Listener());
+
+ void sendContent(framing::AMQBody::shared_ptr content);
+};
+
+}}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp Thu Aug 2 11:09:48 2007
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "FutureCompletion.h"
+
+using namespace qpid::client;
+using namespace qpid::sys;
+
+FutureCompletion::FutureCompletion() : complete(false), closed(false), code(0) {}
+
+bool FutureCompletion::isComplete()
+{
+ Monitor::ScopedLock l(lock);
+ return complete;
+}
+
+void FutureCompletion::completed()
+{
+ Monitor::ScopedLock l(lock);
+ complete = true;
+ lock.notifyAll();
+}
+
+void FutureCompletion::waitForCompletion()
+{
+ Monitor::ScopedLock l(lock);
+ while (!complete && !closed) {
+ lock.wait();
+ }
+ if (closed) {
+ throw ChannelException(code, text);
+ }
+}
+
+void FutureCompletion::close(uint16_t _code, const std::string& _text)
+{
+ Monitor::ScopedLock l(lock);
+ complete = true;
+ closed = true;
+ code = _code;
+ text = _text;
+ lock.notifyAll();
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h Thu Aug 2 11:09:48 2007
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _FutureCompletion_
+#define _FutureCompletion_
+
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/sys/Monitor.h"
+
+namespace qpid {
+namespace client {
+
+class FutureCompletion
+{
+protected:
+ sys::Monitor lock;
+ bool complete;
+ bool closed;
+ uint16_t code;
+ std::string text;
+
+public:
+ FutureCompletion();
+ virtual ~FutureCompletion(){}
+ bool isComplete();
+ void waitForCompletion();
+ void completed();
+ void close(uint16_t code, const std::string& text);
+};
+
+}}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureCompletion.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.cpp?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.cpp Thu Aug 2 11:09:48 2007
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "FutureFactory.h"
+
+using namespace qpid::client;
+using namespace boost;
+
+shared_ptr<FutureCompletion> FutureFactory::createCompletion()
+{
+ shared_ptr<FutureCompletion> f(new FutureCompletion());
+ weak_ptr<FutureCompletion> w(f);
+ set.push_back(w);
+ return f;
+}
+
+shared_ptr<FutureResponse> FutureFactory::createResponse()
+{
+ shared_ptr<FutureResponse> f(new FutureResponse());
+ weak_ptr<FutureCompletion> w(static_pointer_cast<FutureCompletion>(f));
+ set.push_back(w);
+ return f;
+}
+
+void FutureFactory::close(uint16_t code, const std::string& text)
+{
+ for (WeakPtrSet::iterator i = set.begin(); i != set.end(); i++) {
+ shared_ptr<FutureCompletion> p = i->lock();
+ if (p) {
+ p->close(code, text);
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.h?view=auto&rev=562212
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.h Thu Aug 2 11:09:48 2007
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _FutureFactory_
+#define _FutureFactory_
+
+#include <vector>
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
+#include "FutureCompletion.h"
+#include "FutureResponse.h"
+
+namespace qpid {
+namespace client {
+
+class FutureFactory
+{
+ typedef std::vector< boost::weak_ptr<FutureCompletion> > WeakPtrSet;
+ WeakPtrSet set;
+
+public:
+ boost::shared_ptr<FutureCompletion> createCompletion();
+ boost::shared_ptr<FutureResponse> createResponse();
+ void close(uint16_t code, const std::string& text);
+};
+
+}}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureFactory.h
------------------------------------------------------------------------------
svn:keywords = Rev Date