You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/03/04 21:42:21 UTC
svn commit: r633627 - in /incubator/qpid/trunk/qpid/cpp: rubygen/99-0/ src/
src/qpid/broker/ src/qpid/framing/ xml/
Author: gsim
Date: Tue Mar 4 12:42:19 2008
New Revision: 633627
URL: http://svn.apache.org/viewvc?rev=633627&view=rev
Log:
Further updates to support final 0-10 spec
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/rubygen/99-0/Session.rb
incubator/qpid/trunk/qpid/cpp/rubygen/99-0/structs.rb
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ModelMethod.h
incubator/qpid/trunk/qpid/cpp/xml/extra.xml
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/99-0/Session.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/99-0/Session.rb?rev=633627&r1=633626&r2=633627&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/99-0/Session.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/99-0/Session.rb Tue Mar 4 12:42:19 2008
@@ -6,9 +6,9 @@
class CppGen
def session_methods
- excludes = ["channel", "connection", "session", "execution", "connection010", "session010"]
+ excludes = ["channel", "connection", "session", "execution"]
gen_methods=@amqp.methods_on(@chassis).reject { |m|
- excludes.include? m.parent.name
+ excludes.include? m.parent.name or m.body_name.include?("010")
}
end
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/99-0/structs.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/99-0/structs.rb?rev=633627&r1=633626&r2=633627&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/99-0/structs.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/99-0/structs.rb Tue Mar 4 12:42:19 2008
@@ -39,7 +39,8 @@
end
def execution_header?(s)
- false and s.kind_of? AmqpMethod and s.parent.l4?
+ #false and s.kind_of? AmqpMethod and s.parent.l4?
+ s.kind_of? AmqpMethod and s.parent.name.include?("010") and not s.parent.control?
end
def has_bitfields_only(s)
@@ -453,7 +454,7 @@
{
EOS
if (execution_header?(s))
- genl "ModelMethod::encode(buffer);"
+ genl "encodeHeader(buffer);"
end
if (is_packed(s))
@@ -473,7 +474,7 @@
genl "buffer.put#{s.size.caps}(bodySize() + 2/*typecode*/);" if s.size
genl "buffer.putShort(TYPE);"
else
- genl "buffer.put#{s.size.caps}(size());" if s.size
+ genl "buffer.put#{s.size.caps}(bodySize());" if s.size
end
end
genl "encodeStructBody(buffer);"
@@ -485,7 +486,7 @@
{
EOS
if (execution_header?(s))
- genl "ModelMethod::decode(buffer);"
+ genl "decodeHeader(buffer);"
end
if (is_packed(s))
@@ -514,7 +515,7 @@
uint32_t total = 0;
EOS
if (execution_header?(s))
- genl "total += ModelMethod::size();"
+ genl "total += headerSize();"
end
if (is_packed(s))
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=633627&r1=633626&r2=633627&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Mar 4 12:42:19 2008
@@ -164,6 +164,7 @@
$(mgen_broker_cpp) \
qpid/broker/Broker.cpp \
qpid/broker/BrokerAdapter.cpp \
+ qpid/broker/SessionAdapter.cpp \
qpid/broker/BrokerSingleton.cpp \
qpid/broker/Exchange.cpp \
qpid/broker/Queue.cpp \
@@ -273,6 +274,7 @@
qpid/shared_ptr.h \
qpid/broker/Broker.h \
qpid/broker/BrokerAdapter.h \
+ qpid/broker/SessionAdapter.h \
qpid/broker/Exchange.h \
qpid/broker/Queue.h \
qpid/broker/BrokerSingleton.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?rev=633627&r1=633626&r2=633627&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Tue Mar 4 12:42:19 2008
@@ -80,6 +80,10 @@
TunnelHandler* getTunnelHandler() {
throw framing::NotImplementedException("Tunnel class not implemented"); }
+ Exchange010Handler* getExchange010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+ Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+ Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+
// Handlers no longer implemented in BrokerAdapter:
#define BADHANDLER() assert(0); throw framing::NotImplementedException("")
ExecutionHandler* getExecutionHandler() { BADHANDLER(); }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=633627&r1=633626&r2=633627&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Mar 4 12:42:19 2008
@@ -117,7 +117,7 @@
if (mgmtClosing)
close (403, "Closed by Management Request", 0, 0);
- if (frame.getChannel() == 0) {
+ if (frame.getChannel() == 0 && frame.getMethod()) {
adapter.handle(frame);
} else {
getChannel(frame.getChannel()).in(frame);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=633627&r1=633626&r2=633627&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Mar 4 12:42:19 2008
@@ -35,6 +35,7 @@
using std::string;
TransferAdapter Message::TRANSFER;
+PreviewAdapter Message::TRANSFER_99_0;
Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), publisher(0), adapter(0) {}
@@ -219,6 +220,8 @@
{
if (!adapter) {
if(frames.isA<MessageTransferBody>()) {
+ adapter = &TRANSFER_99_0;
+ } else if(frames.isA<Message010TransferBody>()) {
adapter = &TRANSFER;
} else {
const AMQMethodBody* method = frames.getMethod();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=633627&r1=633626&r2=633627&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Tue Mar 4 12:42:19 2008
@@ -133,6 +133,7 @@
mutable MessageAdapter* adapter;
static TransferAdapter TRANSFER;
+ static PreviewAdapter TRANSFER_99_0;
MessageAdapter& getAdapter() const;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp?rev=633627&r1=633626&r2=633627&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp Tue Mar 4 12:42:19 2008
@@ -36,12 +36,12 @@
std::string TransferAdapter::getExchange(const framing::FrameSet& f)
{
- return f.as<framing::MessageTransferBody>()->getDestination();
+ return f.as<framing::Message010TransferBody>()->getDestination();
}
bool TransferAdapter::isImmediate(const framing::FrameSet&)
{
- //TODO: we seem to have lost the immediate flag
+ //TODO: delete this, immediate is no longer part of the spec
return false;
}
@@ -57,4 +57,8 @@
return p && p->getDeliveryMode() == 2;
}
+ std::string PreviewAdapter::getExchange(const framing::FrameSet& f)
+ {
+ return f.as<framing::MessageTransferBody>()->getDestination();
+ }
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h?rev=633627&r1=633626&r2=633627&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h Tue Mar 4 12:42:19 2008
@@ -29,6 +29,7 @@
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/Message010TransferBody.h"
namespace qpid {
namespace broker {
@@ -48,10 +49,15 @@
struct TransferAdapter : MessageAdapter
{
std::string getRoutingKey(const framing::FrameSet& f);
- std::string getExchange(const framing::FrameSet& f);
+ virtual std::string getExchange(const framing::FrameSet& f);
bool isImmediate(const framing::FrameSet&);
const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
bool isPersistent(const framing::FrameSet& f);
+};
+
+struct PreviewAdapter : TransferAdapter
+{
+ std::string getExchange(const framing::FrameSet& f);
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=633627&r1=633626&r2=633627&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Tue Mar 4 12:42:19 2008
@@ -28,6 +28,10 @@
using namespace qpid::broker;
using namespace qpid::framing;
+namespace
+{
+ std::string type_str(uint8_t type);
+}
MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
@@ -39,7 +43,19 @@
state = HEADER;
break;
case HEADER:
- checkType(HEADER_BODY, frame.getBody()->type());
+ switch (frame.getBody()->type()) {
+ case CONTENT_BODY:
+ //TODO: rethink how to handle non-existent headers...
+ //didn't get a header: add in a dummy
+ message->getFrames().append(AMQFrame(AMQHeaderBody()));
+ break;
+ case HEADER_BODY:
+ break;
+ default:
+ throw CommandInvalidException(
+ QPID_MSG("Invalid frame sequence for message, expected header or content got "
+ << type_str(frame.getBody()->type()) << ")"));
+ }
state = CONTENT;
break;
case CONTENT:
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=633627&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Tue Mar 4 12:42:19 2008
@@ -0,0 +1,382 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "SessionAdapter.h"
+#include "Connection.h"
+#include "DeliveryToken.h"
+#include "MessageDelivery.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/reply_exceptions.h"
+#include <boost/format.hpp>
+#include <boost/cast.hpp>
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker {
+
+using namespace qpid;
+using namespace qpid::framing;
+
+typedef std::vector<Queue::shared_ptr> QueueVector;
+
+SessionAdapter::SessionAdapter(SemanticState& s) :
+ HandlerImpl(s),
+ exchangeImpl(s),
+ queueImpl(s),
+ messageImpl(s)
+{}
+
+
+void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type,
+ const string& alternateExchange,
+ bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
+
+ //TODO: implement autoDelete
+ Exchange::shared_ptr alternate;
+ if (!alternateExchange.empty()) {
+ alternate = getBroker().getExchanges().get(alternateExchange);
+ }
+ if(passive){
+ Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
+ checkType(actual, type);
+ checkAlternate(actual, alternate);
+ }else{
+ try{
+ std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
+ if (response.second) {
+ if (durable) {
+ getBroker().getStore().create(*response.first);
+ }
+ if (alternate) {
+ response.first->setAlternate(alternate);
+ alternate->incAlternateUsers();
+ }
+ } else {
+ checkType(response.first, type);
+ checkAlternate(response.first, alternate);
+ }
+ }catch(UnknownExchangeTypeException& e){
+ throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
+ }
+ }
+}
+
+void SessionAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type)
+{
+ if (!type.empty() && exchange->getType() != type) {
+ throw NotAllowedException(QPID_MSG("Exchange declared to be of type " << exchange->getType() << ", requested " << type));
+ }
+}
+
+void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate)
+{
+ if (alternate && alternate != exchange->getAlternate())
+ throw NotAllowedException(
+ QPID_MSG("Exchange declared with alternate-exchange "
+ << exchange->getAlternate()->getName() << ", requested "
+ << alternate->getName()));
+}
+
+void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/){
+ //TODO: implement unused
+ Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
+ if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
+ if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
+ if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
+ getBroker().getExchanges().destroy(name);
+}
+
+Exchange010QueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
+{
+ try {
+ Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
+ return Exchange010QueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
+ } catch (const ChannelException& e) {
+ return Exchange010QueryResult("", false, true, FieldTable());
+ }
+}
+void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
+ const string& exchangeName, const string& routingKey,
+ const FieldTable& arguments){
+
+ Queue::shared_ptr queue = state.getQueue(queueName);
+ Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
+ if(exchange){
+ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
+ if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
+ queue->bound(exchangeName, routingKey, arguments);
+ if (exchange->isDurable() && queue->isDurable()) {
+ getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
+ }
+ }
+ }else{
+ throw NotFoundException(
+ "Bind failed. No such exchange: " + exchangeName);
+ }
+}
+
+void
+SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
+ const string& exchangeName,
+ const string& routingKey)
+{
+ Queue::shared_ptr queue = state.getQueue(queueName);
+ if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
+
+ Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
+ if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
+
+ //TODO: revise unbind to rely solely on binding key (not args)
+ if (exchange->unbind(queue, routingKey, 0) && exchange->isDurable() && queue->isDurable()) {
+ getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable());
+ }
+
+}
+
+Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
+ const std::string& queueName,
+ const std::string& key,
+ const framing::FieldTable& args)
+{
+ Exchange::shared_ptr exchange;
+ try {
+ exchange = getBroker().getExchanges().get(exchangeName);
+ } catch (const ChannelException&) {}
+
+ Queue::shared_ptr queue;
+ if (!queueName.empty()) {
+ queue = getBroker().getQueues().find(queueName);
+ }
+
+ if (!exchange) {
+ return Exchange010BoundResult(true, false, false, false, false);
+ } else if (!queueName.empty() && !queue) {
+ return Exchange010BoundResult(false, true, false, false, false);
+ } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
+ return Exchange010BoundResult(false, false, false, false, false);
+ } else {
+ //need to test each specified option individually
+ bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
+ bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
+ bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
+
+ return Exchange010BoundResult(false, false, !queueMatched, !keyMatched, !argsMatched);
+ }
+}
+
+Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
+{
+ Queue::shared_ptr queue = state.getQueue(name);
+ Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
+
+ return Queue010QueryResult(queue->getName(),
+ alternateExchange ? alternateExchange->getName() : "",
+ queue->isDurable(),
+ queue->hasExclusiveOwner(),
+ queue->isAutoDelete(),
+ queue->getSettings(),
+ queue->getMessageCount(),
+ queue->getConsumerCount());
+}
+
+void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, const qpid::framing::FieldTable& arguments){
+
+ Exchange::shared_ptr alternate;
+ if (!alternateExchange.empty()) {
+ alternate = getBroker().getExchanges().get(alternateExchange);
+ }
+ Queue::shared_ptr queue;
+ if (passive && !name.empty()) {
+ queue = state.getQueue(name);
+ //TODO: check alternate-exchange is as expected
+ } else {
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ getBroker().getQueues().declare(
+ name, durable,
+ autoDelete,
+ exclusive ? &getConnection() : 0);
+ queue = queue_created.first;
+ assert(queue);
+ if (queue_created.second) { // This is a new queue
+ if (alternate) {
+ queue->setAlternateExchange(alternate);
+ alternate->incAlternateUsers();
+ }
+
+ //apply settings & create persistent record if required
+ queue_created.first->create(arguments);
+
+ //add default binding:
+ getBroker().getExchanges().getDefault()->bind(queue, name, 0);
+ queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
+
+ //handle automatic cleanup:
+ if (exclusive) {
+ getConnection().exclusiveQueues.push_back(queue);
+ }
+ } else {
+ if (exclusive && queue->setExclusiveOwner(&getConnection())) {
+ getConnection().exclusiveQueues.push_back(queue);
+ }
+ }
+ }
+ if (exclusive && !queue->isExclusiveOwner(&getConnection()))
+ throw ResourceLockedException(
+ QPID_MSG("Cannot grant exclusive access to queue "
+ << queue->getName()));
+}
+
+
+void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
+ state.getQueue(queue)->purge();
+}
+
+void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
+ ChannelException error(0, "");
+ Queue::shared_ptr q = state.getQueue(queue);
+ if(ifEmpty && q->getMessageCount() > 0){
+ throw PreconditionFailedException("Queue not empty.");
+ }else if(ifUnused && q->getConsumerCount() > 0){
+ throw PreconditionFailedException("Queue in use.");
+ }else{
+ //remove the queue from the list of exclusive queues if necessary
+ if(q->isExclusiveOwner(&getConnection())){
+ QueueVector::iterator i = find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
+ if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
+ }
+ q->destroy();
+ getBroker().getQueues().destroy(queue);
+ q->unbind(getBroker().getExchanges(), q);
+ }
+}
+
+
+SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
+ HandlerImpl(s),
+ releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)),
+ rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
+ acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
+ {}
+
+//
+// Message class method handlers
+//
+
+void SessionAdapter::MessageHandlerImpl::transfer(const string& /*destination*/,
+ uint8_t /*acceptMode*/,
+ uint8_t /*acquireMode*/)
+{
+ //not yet used (content containing assemblies treated differently at present
+}
+
+ void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, bool /*setRedelivered*/)
+{
+ transfers.for_each(releaseOp);
+}
+
+void
+SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
+ const string& destination,
+ uint8_t acceptMode,
+ uint8_t acquireMode,
+ bool exclusive,
+ const string& /*resumeId*/,//TODO implement resume behaviour
+ uint64_t /*resumeTtl*/,
+ const FieldTable& arguments)
+{
+ Queue::shared_ptr queue = state.getQueue(queueName);
+ if(!destination.empty() && state.exists(destination))
+ throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
+
+ string tag = destination;
+ state.consume(MessageDelivery::getMessageDeliveryToken(destination, acceptMode, acquireMode),
+ tag, queue, false, //TODO get rid of no-local
+ acceptMode == 1, acquireMode == 0, exclusive, &arguments);
+}
+
+void
+SessionAdapter::MessageHandlerImpl::cancel(const string& destination )
+{
+ state.cancel(destination);
+}
+
+void
+SessionAdapter::MessageHandlerImpl::reject(const SequenceSet& transfers, uint16_t /*code*/, const string& /*text*/ )
+{
+ transfers.for_each(rejectOp);
+}
+
+void SessionAdapter::MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
+{
+ if (unit == 0) {
+ //message
+ state.addMessageCredit(destination, value);
+ } else if (unit == 1) {
+ //bytes
+ state.addByteCredit(destination, value);
+ } else {
+ //unknown
+ throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit));
+ }
+
+}
+
+void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destination, u_int8_t mode)
+{
+ if (mode == 0) {
+ //credit
+ state.setCreditMode(destination);
+ } else if (mode == 1) {
+ //window
+ state.setWindowMode(destination);
+ } else{
+ throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode));
+ }
+}
+
+void SessionAdapter::MessageHandlerImpl::flush(const std::string& destination)
+{
+ state.flush(destination);
+}
+
+void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination)
+{
+ state.stop(destination);
+}
+
+void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands)
+{
+
+ commands.for_each(acceptOp);
+}
+
+/*
+void SessionAdapter::MessageHandlerImpl::acquire(const SequenceSet& transfers)
+{
+ SequenceNumberSet results;
+ RangedOperation op = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results));
+ transfers.processRanges(op);
+ results = results.condense();
+ getProxy().getMessage().acquired(results);
+}
+*/
+
+}} // namespace qpid::broker
+
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=633627&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Tue Mar 4 12:42:19 2008
@@ -0,0 +1,183 @@
+#ifndef _broker_SessionAdapter_h
+#define _broker_SessionAdapter_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "HandlerImpl.h"
+
+#include "qpid/Exception.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/SequenceSet.h"
+
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace broker {
+
+class Channel;
+class Connection;
+class Broker;
+
+/**
+ * Per-channel protocol adapter.
+ *
+ * A container for a collection of AMQP-class adapters that translate
+ * AMQP method bodies into calls on the core Broker objects. Each
+ * adapter class also provides a client proxy to send methods to the
+ * peer.
+ *
+ */
+class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
+{
+ public:
+ SessionAdapter(SemanticState& session);
+
+
+ framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();}
+
+ Message010Handler* getMessage010Handler(){ return &messageImpl; }
+ Exchange010Handler* getExchange010Handler(){ return &exchangeImpl; }
+ Queue010Handler* getQueue010Handler(){ return &queueImpl; }
+
+
+ BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ ExchangeHandler* getExchangeHandler(){ throw framing::NotImplementedException("Class not implemented"); }
+ BindingHandler* getBindingHandler(){ throw framing::NotImplementedException("Class not implemented"); }
+ QueueHandler* getQueueHandler(){ throw framing::NotImplementedException("Class not implemented"); }
+ TxHandler* getTxHandler(){ throw framing::NotImplementedException("Class not implemented"); }
+ MessageHandler* getMessageHandler(){ throw framing::NotImplementedException("Class not implemented"); }
+ DtxCoordinationHandler* getDtxCoordinationHandler(){ throw framing::NotImplementedException("Class not implemented"); }
+ DtxDemarcationHandler* getDtxDemarcationHandler(){ throw framing::NotImplementedException("Class not implemented"); }
+ AccessHandler* getAccessHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ FileHandler* getFileHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ StreamHandler* getStreamHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ TunnelHandler* getTunnelHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ ExecutionHandler* getExecutionHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ ConnectionHandler* getConnectionHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ SessionHandler* getSessionHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ Connection010Handler* getConnection010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+ Session010Handler* getSession010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+
+ private:
+ class ExchangeHandlerImpl :
+ public Exchange010Handler,
+ public HandlerImpl
+ {
+ public:
+ ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
+
+ void declare(const std::string& exchange, const std::string& type,
+ const std::string& alternateExchange,
+ bool passive, bool durable, bool autoDelete,
+ const qpid::framing::FieldTable& arguments);
+ void delete_(const std::string& exchange, bool ifUnused);
+ framing::Exchange010QueryResult query(const std::string& name);
+ void bind(const std::string& queue,
+ const std::string& exchange, const std::string& routingKey,
+ const qpid::framing::FieldTable& arguments);
+ void unbind(const std::string& queue,
+ const std::string& exchange,
+ const std::string& routingKey);
+ framing::Exchange010BoundResult bound(const std::string& exchange,
+ const std::string& queue,
+ const std::string& routingKey,
+ const framing::FieldTable& arguments);
+ private:
+ void checkType(shared_ptr<Exchange> exchange, const std::string& type);
+
+ void checkAlternate(shared_ptr<Exchange> exchange,
+ shared_ptr<Exchange> alternate);
+ };
+
+ class QueueHandlerImpl :
+ public Queue010Handler,
+ public HandlerImpl
+ {
+ public:
+ QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
+
+ void declare(const std::string& queue,
+ const std::string& alternateExchange,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete,
+ const qpid::framing::FieldTable& arguments);
+ void delete_(const std::string& queue,
+ bool ifUnused, bool ifEmpty);
+ void purge(const std::string& queue);
+ framing::Queue010QueryResult query(const std::string& queue);
+ };
+
+ class MessageHandlerImpl :
+ public Message010Handler,
+ public HandlerImpl
+ {
+ typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
+ RangedOperation releaseOp;
+ RangedOperation rejectOp;
+ RangedOperation acceptOp;
+
+ public:
+ MessageHandlerImpl(SemanticState& session);
+ void transfer(const string& destination,
+ uint8_t acceptMode,
+ uint8_t acquireMode);
+
+ void accept(const framing::SequenceSet& commands);
+
+ void reject(const framing::SequenceSet& commands,
+ uint16_t code,
+ const string& text);
+
+ void release(const framing::SequenceSet& commands,
+ bool setRedelivered);
+
+ void subscribe(const string& queue,
+ const string& destination,
+ uint8_t acceptMode,
+ uint8_t acquireMode,
+ bool exclusive,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+
+ void cancel(const string& destination);
+
+ void setFlowMode(const string& destination,
+ uint8_t flowMode);
+
+ void flow(const string& destination,
+ uint8_t unit,
+ uint32_t value);
+
+ void flush(const string& destination);
+
+ void stop(const string& destination);
+
+ };
+
+ ExchangeHandlerImpl exchangeImpl;
+ QueueHandlerImpl queueImpl;
+ MessageHandlerImpl messageImpl;
+};
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_SessionAdapter_h*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=633627&r1=633626&r2=633627&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Mar 4 12:42:19 2008
@@ -31,7 +31,7 @@
#include "qpid/sys/Time.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Session.h"
-#include "BrokerAdapter.h"
+#include "SessionAdapter.h"
#include "DeliveryAdapter.h"
#include "MessageBuilder.h"
#include "SessionContext.h"
@@ -132,7 +132,7 @@
sys::Mutex lock;
SemanticState semanticState;
- BrokerAdapter adapter;
+ SessionAdapter adapter;
MessageBuilder msgBuilder;
RangedOperation ackOp;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp?rev=633627&r1=633626&r2=633627&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp Tue Mar 4 12:42:19 2008
@@ -30,7 +30,7 @@
FrameSet::FrameSet(const SequenceNumber& _id) : id(_id) {parts.reserve(4);}
-void FrameSet::append(AMQFrame& part)
+void FrameSet::append(const AMQFrame& part)
{
parts.push_back(part);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h?rev=633627&r1=633626&r2=633627&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h Tue Mar 4 12:42:19 2008
@@ -43,7 +43,7 @@
typedef boost::shared_ptr<FrameSet> shared_ptr;
FrameSet(const SequenceNumber& id);
- void append(AMQFrame& part);
+ void append(const AMQFrame& part);
bool isComplete() const;
uint64_t getContentSize() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ModelMethod.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ModelMethod.h?rev=633627&r1=633626&r2=633627&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ModelMethod.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ModelMethod.h Tue Mar 4 12:42:19 2008
@@ -33,9 +33,9 @@
mutable ExecutionHeader header;
public:
virtual ~ModelMethod() {}
- virtual void encode(Buffer& buffer) const { header.encode(buffer); }
- virtual void decode(Buffer& buffer, uint32_t size=0) { header.decode(buffer, size); }
- virtual uint32_t size() const { return header.size(); }
+ virtual void encodeHeader(Buffer& buffer) const { header.encode(buffer); }
+ virtual void decodeHeader(Buffer& buffer, uint32_t size=0) { header.decode(buffer, size); }
+ virtual uint32_t headerSize() const { return header.size(); }
virtual bool isSync() const { return header.getSync(); }
virtual void setSync(bool on) const { header.setSync(on); }
ExecutionHeader& getHeader() { return header; }
Modified: incubator/qpid/trunk/qpid/cpp/xml/extra.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/extra.xml?rev=633627&r1=633626&r2=633627&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/extra.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/extra.xml Tue Mar 4 12:42:19 2008
@@ -582,4 +582,185 @@
</class>
+<class name="message010" index="4">
+ <doc>blah, blah</doc>
+ <method name = "transfer" index="1">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="destination" domain="shortstr"/>
+ <field name="accept-mode" domain="octet"/>
+ <field name="acquire-mode" domain="octet"/>
+ </method>
+ <method name = "accept" index="2">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="commands" domain="sequence-set"/>
+ </method>
+ <method name = "reject" index="3">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="commands" domain="sequence-set"/>
+ <field name="code" domain="short"/>
+ <field name="text" domain="shortstr"/>
+ </method>
+ <method name = "release" index="4">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="commands" domain="sequence-set"/>
+ <field name="set-redelivered" domain="bit"/>
+ </method>
+
+
+ <method name = "subscribe" index="7">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="queue" domain="shortstr"/>
+ <field name="destination" domain="shortstr"/>
+ <field name="accept-mode" domain="octet"/>
+ <field name="acquire-mode" domain="octet"/>
+ <field name="exclusive" domain="bit"/>
+ <field name="resume-id" domain="mediumstr"/>
+ <field name="resume-ttl" domain="longlong"/>
+ <field name="arguments" domain="table"/>
+ </method>
+ <method name = "cancel" index="8">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="destination" domain="shortstr"/>
+ </method>
+ <method name = "set-flow-mode" index="9">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="destination" domain="shortstr"/>
+ <field name="flow-mode" domain="octet"/>
+ </method>
+ <method name = "flow" index="10">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="destination" domain="shortstr"/>
+ <field name="unit" domain="octet"/>
+ <field name="value" domain="long"/>
+ </method>
+ <method name = "flush" index="11">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="destination" domain="shortstr"/>
+ </method>
+ <method name = "stop" index="12">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="destination" domain="shortstr"/>
+ </method>
+</class>
+
+<class name="exchange010" index="7">
+ <doc>blah, blah</doc>
+ <method name = "declare" index="1">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="name" domain="shortstr"/>
+ <field name="type" domain="shortstr"/>
+ <field name="alternate-exchange" domain="shortstr"/>
+ <field name="passive" domain="bit"/>
+ <field name="durable" domain="bit"/>
+ <field name="auto-delete" domain="bit"/>
+ <field name="arguments" domain="table"/>
+ </method>
+ <method name = "delete" index="2">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="name" domain="shortstr"/>
+ <field name="if-unused" domain="bit"/>
+ </method>
+ <method name = "query" index="3">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="name" domain="shortstr"/>
+ <result>
+ <struct size="long" type="1">
+ <field name="type" domain="shortstr"/>
+ <field name="durable" domain="bit"/>
+ <field name="not-found" domain="bit"/>
+ <field name="arguments" domain="table"/>
+ </struct>
+ </result>
+ </method>
+ <method name = "bind" index="4">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="queue" domain="shortstr"/>
+ <field name="exchange" domain="shortstr"/>
+ <field name="binding-key" domain="shortstr"/>
+ <field name="arguments" domain="table"/>
+ </method>
+ <method name = "unbind" index="5">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="queue" domain="shortstr"/>
+ <field name="exchange" domain="shortstr"/>
+ <field name="binding-key" domain="shortstr"/>
+ </method>
+ <method name = "bound" index="6">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="queue" domain="shortstr"/>
+ <field name="exchange" domain="shortstr"/>
+ <field name="binding-key" domain="shortstr"/>
+ <field name="arguments" domain="table"/>
+ <result>
+ <struct size="long" type="1">
+ <field name="exchange-not-found" domain="bit"/>
+ <field name="queue-not-found" domain="bit"/>
+ <field name="queue-not-matched" domain="bit"/>
+ <field name="key-not-matched" domain="bit"/>
+ <field name="arguments-not-matched" domain="bit"/>
+ </struct>
+ </result>
+ </method>
+</class>
+
+<class name="queue010" index="8">
+ <doc>blah, blah</doc>
+ <method name = "declare" index="1">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="name" domain="shortstr"/>
+ <field name="alternate-exchange" domain="shortstr"/>
+ <field name="passive" domain="bit"/>
+ <field name="durable" domain="bit"/>
+ <field name="exclusive" domain="bit"/>
+ <field name="auto-delete" domain="bit"/>
+ <field name="arguments" domain="table"/>
+ </method>
+ <method name = "delete" index="2">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="name" domain="shortstr"/>
+ <field name="if-unused" domain="bit"/>
+ <field name="if-empty" domain="bit"/>
+ </method>
+ <method name = "purge" index="3">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="name" domain="shortstr"/>
+ </method>
+ <method name = "query" index="4">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="name" domain="shortstr"/>
+ <result>
+ <struct size="long" type="1">
+ <field name="name" domain="shortstr"/>
+ <field name="alternate-exchange" domain="shortstr"/>
+ <field name="passive" domain="bit"/>
+ <field name="durable" domain="bit"/>
+ <field name="auto-delete" domain="bit"/>
+ <field name="arguments" domain="table"/>
+ <field name="message-count" domain="long"/>
+ <field name="subscriber-count" domain="long"/>
+ </struct>
+ </result>
+ </method>
+</class>
+
</amqp>