You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2006/10/30 20:27:56 UTC
svn commit: r469242 [1/2] - in /incubator/qpid/trunk/qpid/cpp:
src/qpid/broker/ test/unit/qpid/broker/ test/unit/qpid/concurrent/
Author: gsim
Date: Mon Oct 30 11:27:54 2006
New Revision: 469242
URL: http://svn.apache.org/viewvc?view=rev&rev=469242
Log:
Initial implementation for tx class.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (with props)
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp (with props)
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/concurrent/
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp (with props)
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Router.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Router.h
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/RouterTest.cpp
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ChannelTest.cpp
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ExchangeTest.cpp
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/AccumulatedAck.h"
+
+using std::less_equal;
+using std::bind2nd;
+using namespace qpid::broker;
+
+void AccumulatedAck::update(u_int64_t tag, bool multiple){
+ if(multiple){
+ if(tag > range) range = tag;
+ //else don't care, it is already counted
+ }else if(tag < range){
+ individual.push_back(tag);
+ }
+}
+
+void AccumulatedAck::consolidate(){
+ individual.sort();
+ //remove any individual tags that are covered by range
+ individual.remove_if(bind2nd(less_equal<u_int64_t>(), range));
+}
+
+void AccumulatedAck::clear(){
+ range = 0;
+ individual.clear();
+}
+
+bool AccumulatedAck::covers(u_int64_t tag) const{
+ return tag < range || find(individual.begin(), individual.end(), tag) != individual.end();
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _AccumulatedAck_
+#define _AccumulatedAck_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Keeps an accumulated record of acked messages (by delivery
+ * tag).
+ */
+ struct AccumulatedAck{
+ /**
+ * If not zero, then everything up to this value has been
+ * acked.
+ */
+ u_int64_t range;
+ /**
+ * List of individually acked messages that are not
+ * included in the range marked by 'range'.
+ */
+ std::list<u_int64_t> individual;
+
+ void update(u_int64_t tag, bool multiple);
+ void consolidate();
+ void clear();
+ bool covers(u_int64_t tag) const;
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp Mon Oct 30 11:27:54 2006
@@ -21,6 +21,8 @@
#include <sstream>
#include <assert.h>
+using std::mem_fun_ref;
+using std::bind2nd;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::concurrent;
@@ -29,14 +31,17 @@
Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) :
id(_id),
out(_out),
- deliveryTag(1),
+ currentDeliveryTag(1),
transactional(false),
prefetchSize(0),
prefetchCount(0),
- outstandingSize(0),
- outstandingCount(0),
framesize(_framesize),
- tagGenerator("sgen"){}
+ tagGenerator("sgen"),
+ store(0),
+ messageBuilder(this){
+
+ outstanding.reset();
+}
Channel::~Channel(){
}
@@ -86,30 +91,36 @@
}
void Channel::commit(){
-
+ TxAck txAck(accumulatedAck, unacked);
+ txBuffer.enlist(&txAck);
+ if(txBuffer.prepare(store)){
+ txBuffer.commit();
+ }
+ accumulatedAck.clear();
}
void Channel::rollback(){
-
+ txBuffer.rollback();
+ accumulatedAck.clear();
}
void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
Locker locker(deliveryLock);
- u_int64_t myDeliveryTag = deliveryTag++;
+ u_int64_t deliveryTag = currentDeliveryTag++;
if(ackExpected){
- unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
- outstandingSize += msg->contentSize();
- outstandingCount++;
+ unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
+ outstanding.size += msg->contentSize();
+ outstanding.count++;
}
//send deliver method, header and content(s)
- msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
+ msg->deliver(out, id, consumerTag, deliveryTag, framesize);
}
bool Channel::checkPrefetch(Message::shared_ptr& msg){
Locker locker(deliveryLock);
- bool countOk = !prefetchCount || prefetchCount > unacknowledged.size();
- bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty();
+ bool countOk = !prefetchCount || prefetchCount > unacked.size();
+ bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
return countOk && sizeOk;
}
@@ -144,43 +155,66 @@
if(blocked) queue->dispatch();
}
-void Channel::checkMessage(const std::string& text){
- if(!message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text);
- }
+void Channel::handlePublish(Message* _message, Exchange* _exchange){
+ Message::shared_ptr message(_message);
+ exchange = _exchange;
+ messageBuilder.initialise(message);
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr header){
+ messageBuilder.setHeader(header);
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr content){
+ messageBuilder.addContent(content);
}
-void Channel::handlePublish(Message* msg){
- if(message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+void Channel::complete(Message::shared_ptr& msg){
+ if(exchange){
+ if(transactional){
+ TxPublish* deliverable = new TxPublish(msg);
+ exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+ txBuffer.enlist(new DeletingTxOp(deliverable));
+ }else{
+ DeliverableMessage deliverable(msg);
+ exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+ }
+ exchange = 0;
+ }else{
+ std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl;
}
- message = Message::shared_ptr(msg);
}
-void Channel::ack(u_int64_t _deliveryTag, bool multiple){
- Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(_deliveryTag));
- if(i == unacknowledged.end()){
- throw InvalidAckException();
- }else if(multiple){
- unacknowledged.erase(unacknowledged.begin(), ++i);
- //recompute prefetch outstanding (note: messages delivered through get are ignored)
- CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch()));
- outstandingSize = calc.getSize();
- outstandingCount = calc.getCount();
+void Channel::ack(u_int64_t deliveryTag, bool multiple){
+ if(transactional){
+ accumulatedAck.update(deliveryTag, multiple);
+ //TODO: I think the outstanding prefetch size & count should be updated at this point...
+ //TODO: ...this may then necessitate dispatching to consumers
}else{
- if(!i->pull){
- outstandingSize -= i->msg->contentSize();
- outstandingCount--;
+ Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag));
+ if(i == unacked.end()){
+ throw InvalidAckException();
+ }else if(multiple){
+ ack_iterator end = ++i;
+ for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard));
+ unacked.erase(unacked.begin(), end);
+
+ //recalculate the prefetch:
+ outstanding.reset();
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding));
+ }else{
+ i->discard();
+ i->subtractFrom(&outstanding);
+ unacked.erase(i);
}
- unacknowledged.erase(i);
- }
- //if the prefetch limit had previously been reached, there may
- //be messages that can be now be delivered
- for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
- j->second->requestDispatch();
+ //if the prefetch limit had previously been reached, there may
+ //be messages that can be now be delivered
+ for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
+ j->second->requestDispatch();
+ }
}
}
@@ -188,14 +222,12 @@
Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
if(requeue){
- outstandingSize = 0;
- outstandingCount = 0;
- ack_iterator start(unacknowledged.begin());
- ack_iterator end(unacknowledged.end());
- for_each(start, end, Requeue());
- unacknowledged.erase(start, end);
+ outstanding.reset();
+ std::list<DeliveryRecord> copy = unacked;
+ unacked.clear();
+ for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue));
}else{
- for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this));
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
}
}
@@ -203,10 +235,10 @@
Message::shared_ptr msg = queue->dequeue();
if(msg){
Locker locker(deliveryLock);
- u_int64_t myDeliveryTag = deliveryTag++;
+ u_int64_t myDeliveryTag = currentDeliveryTag++;
msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
if(ackExpected){
- unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag));
+ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
return true;
}else{
@@ -214,43 +246,6 @@
}
}
-Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
-
-bool Channel::MatchAck::operator()(AckRecord& record) const{
- return tag == record.deliveryTag;
-}
-
-void Channel::Requeue::operator()(AckRecord& record) const{
- record.msg->redeliver();
- record.queue->deliver(record.msg);
-}
-
-Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
-
-void Channel::Redeliver::operator()(AckRecord& record) const{
- if(record.pull){
- //if message was originally sent as response to get, we must requeue it
- record.msg->redeliver();
- record.queue->deliver(record.msg);
- }else{
- record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
- }
-}
-
-Channel::CalculatePrefetch::CalculatePrefetch() : size(0){}
-
-void Channel::CalculatePrefetch::operator()(AckRecord& record){
- if(!record.pull){
- //ignore messages that were sent in response to get when calculating prefetch
- size += record.msg->contentSize();
- count++;
- }
-}
-
-u_int32_t Channel::CalculatePrefetch::getSize(){
- return size;
-}
-
-u_int16_t Channel::CalculatePrefetch::getCount(){
- return count;
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
+ msg->deliver(out, id, consumerTag, deliveryTag, framesize);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h Mon Oct 30 11:27:54 2006
@@ -19,17 +19,29 @@
#define _Channel_
#include <algorithm>
+#include <functional>
+#include <list>
#include <map>
-#include "qpid/framing/AMQContentBody.h"
-#include "qpid/framing/AMQHeaderBody.h"
-#include "qpid/framing/BasicPublishBody.h"
+#include "qpid/broker/AccumulatedAck.h"
#include "qpid/broker/Binding.h"
#include "qpid/broker/Consumer.h"
+#include "qpid/broker/DeletingTxOp.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/NameGenerator.h"
-#include "qpid/framing/OutputHandler.h"
+#include "qpid/broker/Prefetch.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/broker/TxAck.h"
+#include "qpid/broker/TxBuffer.h"
+#include "qpid/broker/TxPublish.h"
+#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/BasicPublishBody.h"
namespace qpid {
namespace broker {
@@ -37,8 +49,7 @@
* Maintains state for an AMQP channel. Handles incoming and
* outgoing messages for that channel.
*/
- class Channel{
- private:
+ class Channel : private MessageBuilder::CompletionHandler{
class ConsumerImpl : public virtual Consumer{
Channel* parent;
string tag;
@@ -54,92 +65,29 @@
};
typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
-
- struct AckRecord{
- Message::shared_ptr msg;
- Queue::shared_ptr queue;
- string consumerTag;
- u_int64_t deliveryTag;
- bool pull;
-
- AckRecord(Message::shared_ptr _msg,
- Queue::shared_ptr _queue,
- const string _consumerTag,
- const u_int64_t _deliveryTag) : msg(_msg),
- queue(_queue),
- consumerTag(_consumerTag),
- deliveryTag(_deliveryTag),
- pull(false){}
-
- AckRecord(Message::shared_ptr _msg,
- Queue::shared_ptr _queue,
- const u_int64_t _deliveryTag) : msg(_msg),
- queue(_queue),
- consumerTag(""),
- deliveryTag(_deliveryTag),
- pull(true){}
- };
-
- typedef std::vector<AckRecord>::iterator ack_iterator;
-
- class MatchAck{
- const u_int64_t tag;
- public:
- MatchAck(u_int64_t tag);
- bool operator()(AckRecord& record) const;
- };
-
- class Requeue{
- public:
- void operator()(AckRecord& record) const;
- };
-
- class Redeliver{
- Channel* const channel;
- public:
- Redeliver(Channel* const channel);
- void operator()(AckRecord& record) const;
- };
-
- class CalculatePrefetch{
- u_int32_t size;
- u_int16_t count;
- public:
- CalculatePrefetch();
- void operator()(AckRecord& record);
- u_int32_t getSize();
- u_int16_t getCount();
- };
-
const int id;
qpid::framing::OutputHandler* out;
- u_int64_t deliveryTag;
+ u_int64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
bool transactional;
std::map<string, ConsumerImpl*> consumers;
u_int32_t prefetchSize;
u_int16_t prefetchCount;
- u_int32_t outstandingSize;
- u_int16_t outstandingCount;
+ Prefetch outstanding;
u_int32_t framesize;
- Message::shared_ptr message;
NameGenerator tagGenerator;
- std::vector<AckRecord> unacknowledged;
+ std::list<DeliveryRecord> unacked;
qpid::concurrent::MonitorImpl deliveryLock;
+ TxBuffer txBuffer;
+ AccumulatedAck accumulatedAck;
+ TransactionalStore* store;
+ MessageBuilder messageBuilder;//builder for in-progress message
+ Exchange* exchange;//exchange to which any in-progress message was published to
+ virtual void complete(Message::shared_ptr& msg);
void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected);
- void checkMessage(const std::string& text);
- bool checkPrefetch(Message::shared_ptr& msg);
void cancel(consumer_iterator consumer);
-
- template<class Operation> Operation processMessage(Operation route){
- if(message->isComplete()){
- route(message);
- message.reset();
- }
- return route;
- }
-
+ bool checkPrefetch(Message::shared_ptr& msg);
public:
Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize);
@@ -158,37 +106,10 @@
void rollback();
void ack(u_int64_t deliveryTag, bool multiple);
void recover(bool requeue);
-
- /**
- * Handles the initial publish request though a
- * channel. The header and (if applicable) content will be
- * accumulated through calls to handleHeader() and
- * handleContent()
- */
- void handlePublish(Message* msg);
-
- /**
- * A template method that handles a received header and if
- * there is no content routes it using the functor passed
- * in.
- */
- template<class Operation> Operation handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){
- checkMessage("Invalid message sequence: got header before publish.");
- message->setHeader(header);
- return processMessage(route);
- }
-
- /**
- * A template method that handles a received content and
- * if this completes the message, routes it using the
- * functor passed in.
- */
- template<class Operation> Operation handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){
- checkMessage("Invalid message sequence: got content before publish.");
- message->addContent(content);
- return processMessage(route);
- }
-
+ void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);
+ void handlePublish(Message* msg, Exchange* exchange);
+ void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+ void handleContent(qpid::framing::AMQContentBody::shared_ptr content);
};
struct InvalidAckException{};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp Mon Oct 30 11:27:54 2006
@@ -191,6 +191,8 @@
return false;
}
-void Configuration::BoolOption::setValue(const std::string& _value){
- value = strcasecmp(_value.c_str(), "true") == 0;
+void Configuration::BoolOption::setValue(const std::string& /*not required*/){
+ //BoolOptions have no value. The fact that the option is specified
+ //implies the value is true.
+ value = true;
}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/DeletingTxOp.h"
+
+using namespace qpid::broker;
+
+DeletingTxOp::DeletingTxOp(TxOp* const _delegate) : delegate(_delegate){}
+
+bool DeletingTxOp::prepare() throw(){
+ return delegate && delegate->prepare();
+}
+
+void DeletingTxOp::commit() throw(){
+ if(delegate){
+ delegate->commit();
+ delete delegate;
+ delegate = 0;
+ }
+}
+
+void DeletingTxOp::rollback() throw(){
+ if(delegate){
+ delegate->rollback();
+ delete delegate;
+ delegate = 0;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _DeletingTxOp_
+#define _DeletingTxOp_
+
+#include "qpid/broker/TxOp.h"
+
+namespace qpid {
+ namespace broker {
+ /**
+ * TxOp wrapper that will delegate calls & delete the object
+ * to which it delegates after completion of the transaction.
+ */
+ class DeletingTxOp : public virtual TxOp{
+ TxOp* delegate;
+ public:
+ DeletingTxOp(TxOp* const delegate);
+ virtual bool prepare() throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~DeletingTxOp(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Deliverable_
+#define _Deliverable_
+
+#include "qpid/broker/Queue.h"
+
+namespace qpid {
+ namespace broker {
+ class Deliverable{
+ public:
+ virtual void deliverTo(Queue::shared_ptr& queue) = 0;
+ virtual ~Deliverable(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/DeliverableMessage.h"
+
+using namespace qpid::broker;
+
+DeliverableMessage::DeliverableMessage(Message::shared_ptr& _msg) : msg(_msg)
+{
+}
+
+void DeliverableMessage::deliverTo(Queue::shared_ptr& queue)
+{
+ queue->deliver(msg);
+}
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _DeliverableMessage_
+#define _DeliverableMessage_
+
+#include "qpid/broker/Deliverable.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+
+namespace qpid {
+ namespace broker {
+ class DeliverableMessage : public Deliverable{
+ Message::shared_ptr msg;
+ public:
+ DeliverableMessage(Message::shared_ptr& msg);
+ virtual void deliverTo(Queue::shared_ptr& queue);
+ virtual ~DeliverableMessage(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/DeliveryRecord.h"
+#include "qpid/broker/Channel.h"
+
+using namespace qpid::broker;
+
+DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg,
+ Queue::shared_ptr _queue,
+ const string _consumerTag,
+ const u_int64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+ consumerTag(_consumerTag),
+ deliveryTag(_deliveryTag),
+ pull(false){}
+
+DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg,
+ Queue::shared_ptr _queue,
+ const u_int64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+ consumerTag(""),
+ deliveryTag(_deliveryTag),
+ pull(true){}
+
+
+void DeliveryRecord::discard() const{
+ queue->dequeue(msg, 0);
+}
+
+bool DeliveryRecord::matches(u_int64_t tag) const{
+ return deliveryTag == tag;
+}
+
+bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
+ return range->covers(deliveryTag);
+}
+
+void DeliveryRecord::discardIfCoveredBy(const AccumulatedAck* const range) const{
+ if(coveredBy(range)) discard();
+}
+
+void DeliveryRecord::redeliver(Channel* const channel) const{
+ if(pull){
+ //if message was originally sent as response to get, we must requeue it
+ requeue();
+ }else{
+ channel->deliver(msg, consumerTag, deliveryTag);
+ }
+}
+
+void DeliveryRecord::requeue() const{
+ msg->redeliver();
+ queue->deliver(msg);
+}
+
+void DeliveryRecord::addTo(Prefetch* const prefetch) const{
+ if(!pull){
+ //ignore 'pulled' messages (i.e. those that were sent in
+ //response to get) when calculating prefetch
+ prefetch->size += msg->contentSize();
+ prefetch->count++;
+ }
+}
+
+void DeliveryRecord::subtractFrom(Prefetch* const prefetch) const{
+ if(!pull){
+ //ignore 'pulled' messages (i.e. those that were sent in
+ //response to get) when calculating prefetch
+ prefetch->size -= msg->contentSize();
+ prefetch->count--;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,61 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _DeliveryRecord_
+#define _DeliveryRecord_
+
+#include <algorithm>
+#include <list>
+#include "qpid/broker/AccumulatedAck.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Prefetch.h"
+#include "qpid/broker/Queue.h"
+
+namespace qpid {
+ namespace broker {
+ class Channel;
+
+ /**
+ * Record of a delivery for which an ack is outstanding.
+ */
+ class DeliveryRecord{
+ mutable Message::shared_ptr msg;
+ mutable Queue::shared_ptr queue;
+ string consumerTag;
+ u_int64_t deliveryTag;
+ bool pull;
+
+ public:
+ DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const string consumerTag, const u_int64_t deliveryTag);
+ DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const u_int64_t deliveryTag);
+
+ void discard() const;
+ bool matches(u_int64_t tag) const;
+ bool coveredBy(const AccumulatedAck* const range) const;
+ void discardIfCoveredBy(const AccumulatedAck* const range) const;
+ void requeue() const;
+ void redeliver(Channel* const) const;
+ void addTo(Prefetch* const prefetch) const;
+ void subtractFrom(Prefetch* const prefetch) const;
+ };
+
+ typedef std::list<DeliveryRecord>::iterator ack_iterator;
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Mon Oct 30 11:27:54 2006
@@ -51,12 +51,12 @@
lock.release();
}
-void DirectExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){
+void DirectExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){
lock.acquire();
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
int count(0);
for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
- (*i)->deliver(msg);
+ msg.deliverTo(*i);
}
if(!count){
std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Mon Oct 30 11:27:54 2006
@@ -41,7 +41,7 @@
virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
- virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
virtual ~DirectExchange();
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Mon Oct 30 11:27:54 2006
@@ -18,9 +18,9 @@
#ifndef _Exchange_
#define _Exchange_
-#include "qpid/framing/FieldTable.h"
-#include "qpid/broker/Message.h"
+#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Queue.h"
+#include "qpid/framing/FieldTable.h"
namespace qpid {
namespace broker {
@@ -32,7 +32,7 @@
std::string getName() { return name; }
virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
- virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+ virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0;
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Mon Oct 30 11:27:54 2006
@@ -44,10 +44,10 @@
}
}
-void FanOutExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* /*args*/){
+void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* /*args*/){
Locker locker(lock);
for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
- (*i)->deliver(msg);
+ msg.deliverTo(*i);
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Mon Oct 30 11:27:54 2006
@@ -42,7 +42,7 @@
virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
- virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
virtual ~FanOutExchange();
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Mon Oct 30 11:27:54 2006
@@ -58,10 +58,10 @@
}
-void HeadersExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* args){
+void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* args){
Locker locker(lock);;
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, *args)) i->second->deliver(msg);
+ if (match(i->first, *args)) msg.deliverTo(i->second);
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Mon Oct 30 11:27:54 2006
@@ -45,7 +45,7 @@
virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
- virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args);
virtual ~HeadersExchange();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Mon Oct 30 11:27:54 2006
@@ -17,7 +17,6 @@
*/
#include "qpid/concurrent/MonitorImpl.h"
#include "qpid/broker/Message.h"
-#include "qpid/broker/ExchangeRegistry.h"
#include <iostream>
using namespace boost;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Mon Oct 30 11:27:54 2006
@@ -19,16 +19,16 @@
#define _Message_
#include <boost/shared_ptr.hpp>
+#include "qpid/broker/ConnectionToken.h"
+#include "qpid/broker/TxBuffer.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/BasicHeaderProperties.h"
#include "qpid/framing/BasicPublishBody.h"
-#include "qpid/broker/ConnectionToken.h"
#include "qpid/framing/OutputHandler.h"
namespace qpid {
namespace broker {
- class ExchangeRegistry;
/**
* Represents an AMQP message, i.e. a header body, a list of
@@ -48,6 +48,7 @@
qpid::framing::AMQHeaderBody::shared_ptr header;
content_list content;
u_int64_t size;
+ TxBuffer* tx;
void sendContent(qpid::framing::OutputHandler* out,
int channel, u_int32_t framesize);
@@ -79,8 +80,9 @@
qpid::framing::BasicHeaderProperties* getHeaderProperties();
const string& getRoutingKey() const { return routingKey; }
const string& getExchange() const { return exchange; }
- u_int64_t contentSize() const{ return size; }
-
+ u_int64_t contentSize() const { return size; }
+ TxBuffer* getTx() const { return tx; }
+ void setTx(TxBuffer* _tx) { tx = _tx; }
};
}
}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/MessageBuilder.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+MessageBuilder::MessageBuilder(CompletionHandler* _handler) : handler(_handler) {}
+
+void MessageBuilder::route(){
+ if(message->isComplete()){
+ if(handler) handler->complete(message);
+ message.reset();
+ }
+}
+
+void MessageBuilder::initialise(Message::shared_ptr& msg){
+ if(message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+ }
+ message = msg;
+}
+
+void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
+ if(!message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
+ }
+ message->setHeader(header);
+ route();
+}
+
+void MessageBuilder::addContent(AMQContentBody::shared_ptr& content){
+ if(!message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish.");
+ }
+ message->addContent(content);
+ route();
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _MessageBuilder_
+#define _MessageBuilder_
+
+#include "qpid/QpidError.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Message.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/BasicPublishBody.h"
+
+namespace qpid {
+ namespace broker {
+ class MessageBuilder{
+ public:
+ class CompletionHandler{
+ public:
+ virtual void complete(Message::shared_ptr&) = 0;
+ virtual ~CompletionHandler(){}
+ };
+ MessageBuilder(CompletionHandler* _handler);
+ void initialise(Message::shared_ptr& msg);
+ void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header);
+ void addContent(qpid::framing::AMQContentBody::shared_ptr& content);
+ private:
+ Message::shared_ptr message;
+ CompletionHandler* handler;
+
+ void route();
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,71 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _MessageStore_
+#define _MessageStore_
+
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/TransactionalStore.h"
+
+namespace qpid {
+ namespace broker {
+ /**
+ * An abstraction of the persistent storage for messages.
+ */
+ class MessageStore : public TransactionalStore{
+ public:
+ /**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * @param msg the message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void enqueue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0;
+ /**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * @param msg the message to dequeue
+ * @param queue the name of th queue from which it is to be dequeued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void dequeue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0;
+ /**
+ * Treat all enqueue/dequeues where this xid was specified as being committed.
+ */
+ virtual void committed(const string * const xid) = 0;
+ /**
+ * Treat all enqueue/dequeues where this xid was specified as being aborted.
+ */
+ virtual void aborted(const string * const xid) = 0;
+
+ virtual ~MessageStore(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/broker/Prefetch.h"
+
+using namespace qpid::broker;
+
+void Prefetch::reset(){
+ size = 0;
+ count = 0;
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Prefetch_
+#define _Prefetch_
+
+#include "qpid/framing/amqp_types.h"
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Count and total size of asynchronously delivered
+ * (i.e. pushed) messages that have acks outstanding.
+ */
+ struct Prefetch{
+ u_int32_t size;
+ u_int16_t count;
+
+ void reset();
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct 30 11:27:54 2006
@@ -16,16 +16,21 @@
*
*/
#include "qpid/broker/Queue.h"
+#include "qpid/broker/MessageStore.h"
#include "qpid/concurrent/MonitorImpl.h"
#include <iostream>
using namespace qpid::broker;
using namespace qpid::concurrent;
-Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) :
+Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete,
+ MessageStore* const _store,
+ const ConnectionToken* const _owner) :
+
name(_name),
autodelete(_autodelete),
durable(_durable),
+ store(_store),
owner(_owner),
queueing(false),
dispatching(false),
@@ -48,6 +53,11 @@
}
void Queue::deliver(Message::shared_ptr& msg){
+ enqueue(msg, 0);
+ process(msg);
+}
+
+void Queue::process(Message::shared_ptr& msg){
Locker locker(lock);
if(queueing || !dispatch(msg)){
queueing = true;
@@ -152,4 +162,18 @@
bool Queue::canAutoDelete() const{
Locker locker(lock);
return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete);
+}
+
+void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){
+ bool persistent(false);//TODO: pull this from headers
+ if(persistent){
+ store->enqueue(msg, name, xid);
+ }
+}
+
+void Queue::dequeue(Message::shared_ptr& msg, const string * const xid){
+ bool persistent(false);//TODO: pull this from headers
+ if(persistent){
+ store->dequeue(msg, name, xid);
+ }
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Oct 30 11:27:54 2006
@@ -31,6 +31,7 @@
namespace qpid {
namespace broker {
+ class MessageStore;
/**
* Thrown when exclusive access would be violated.
@@ -47,6 +48,7 @@
const string name;
const u_int32_t autodelete;
const bool durable;
+ MessageStore* const store;
const ConnectionToken* const owner;
std::vector<Consumer*> consumers;
std::queue<Binding*> bindings;
@@ -67,7 +69,9 @@
typedef std::vector<shared_ptr> vector;
- Queue(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
+ Queue(const string& name, bool durable = false, u_int32_t autodelete = 0,
+ MessageStore* const store = 0,
+ const ConnectionToken* const owner = 0);
~Queue();
/**
* Informs the queue of a binding that should be cancelled on
@@ -75,13 +79,16 @@
*/
void bound(Binding* b);
/**
- * Delivers a message to the queue from where it will be
- * dispatched to immediately to a consumer if one is
- * available or stored for dequeue or later dispatch if
- * not.
+ * Delivers a message to the queue. Will record it as
+ * enqueued if persistent then process it.
*/
void deliver(Message::shared_ptr& msg);
/**
+ * Dispatches the messages immediately to a consumer if
+ * one is available or stores it for later if not.
+ */
+ void process(Message::shared_ptr& msg);
+ /**
* Dispatch any queued messages providing there are
* consumers for them. Only one thread can be dispatching
* at any time, but this method (rather than the caller)
@@ -98,6 +105,9 @@
inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
inline bool hasExclusiveConsumer() const { return exclusive; }
bool canAutoDelete() const;
+
+ void enqueue(Message::shared_ptr& msg, const string * const xid);
+ void dequeue(Message::shared_ptr& msg, const string * const xid);
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Mon Oct 30 11:27:54 2006
@@ -29,14 +29,15 @@
QueueRegistry::~QueueRegistry(){}
std::pair<Queue::shared_ptr, bool>
-QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, const ConnectionToken* owner)
+QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete,
+ MessageStore* const store, const ConnectionToken* owner)
{
Locker locker(lock);
string name = declareName.empty() ? generateName() : declareName;
assert(!name.empty());
QueueMap::iterator i = queues.find(name);
if (i == queues.end()) {
- Queue::shared_ptr queue(new Queue(name, durable, autoDelete, owner));
+ Queue::shared_ptr queue(new Queue(name, durable, autoDelete, store, owner));
queues[name] = queue;
return std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Mon Oct 30 11:27:54 2006
@@ -46,7 +46,9 @@
* @return The queue and a boolean flag which is true if the queue
* was created by this declare call false if it already existed.
*/
- std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
+ std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0,
+ MessageStore* const _store = 0,
+ const ConnectionToken* const owner = 0);
/**
* Destroy the named queue.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp Mon Oct 30 11:27:54 2006
@@ -19,7 +19,6 @@
#include "qpid/broker/SessionHandlerImpl.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
-#include "qpid/broker/Router.h"
#include "qpid/broker/TopicExchange.h"
#include "assert.h"
@@ -40,11 +39,12 @@
exchanges(_exchanges),
cleaner(_cleaner),
timeout(_timeout),
- connectionHandler(new ConnectionHandlerImpl(this)),
- channelHandler(new ChannelHandlerImpl(this)),
basicHandler(new BasicHandlerImpl(this)),
+ channelHandler(new ChannelHandlerImpl(this)),
+ connectionHandler(new ConnectionHandlerImpl(this)),
exchangeHandler(new ExchangeHandlerImpl(this)),
queueHandler(new QueueHandlerImpl(this)),
+ txHandler(new TxHandlerImpl(this)),
framemax(65536),
heartbeat(0) {}
@@ -146,11 +146,11 @@
}
void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
- getChannel(channel)->handleHeader(body, Router(*exchanges));
+ getChannel(channel)->handleHeader(body);
}
void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
- getChannel(channel)->handleContent(body, Router(*exchanges));
+ getChannel(channel)->handleContent(body);
}
void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
@@ -261,7 +261,8 @@
if (passive && !name.empty()) {
queue = parent->getQueue(name, channel);
} else {
- std::pair<Queue::shared_ptr, bool> queue_created = parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, 0, exclusive ? parent : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
@@ -367,11 +368,16 @@
}
void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
- string& exchange, string& routingKey,
+ string& exchangeName, string& routingKey,
bool mandatory, bool immediate){
- Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate);
- parent->getChannel(channel)->handlePublish(msg);
+ Exchange* exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
+ if(exchange){
+ Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
+ parent->getChannel(channel)->handlePublish(msg, exchange);
+ }else{
+ throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+ }
}
void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, string& queueName, bool noAck){
@@ -395,4 +401,20 @@
void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
parent->getChannel(channel)->recover(requeue);
}
+
+void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){
+ parent->getChannel(channel)->begin();
+ parent->client.getTx().selectOk(channel);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){
+ parent->getChannel(channel)->commit();
+ parent->client.getTx().commitOk(channel);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){
+ parent->getChannel(channel)->rollback();
+ parent->client.getTx().rollbackOk(channel);
+ parent->getChannel(channel)->recover(false);
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h Mon Oct 30 11:27:54 2006
@@ -71,11 +71,12 @@
AutoDelete* const cleaner;
const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
- std::auto_ptr<ConnectionHandler> connectionHandler;
- std::auto_ptr<ChannelHandler> channelHandler;
std::auto_ptr<BasicHandler> basicHandler;
+ std::auto_ptr<ChannelHandler> channelHandler;
+ std::auto_ptr<ConnectionHandler> connectionHandler;
std::auto_ptr<ExchangeHandler> exchangeHandler;
std::auto_ptr<QueueHandler> queueHandler;
+ std::auto_ptr<TxHandler> txHandler;
std::map<u_int16_t, Channel*> channels;
std::vector<Queue::shared_ptr> exclusiveQueues;
@@ -212,18 +213,29 @@
virtual ~BasicHandlerImpl(){}
};
+ class TxHandlerImpl : public virtual TxHandler{
+ SessionHandlerImpl* parent;
+ public:
+ TxHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+ virtual ~TxHandlerImpl() {}
+ virtual void select(u_int16_t channel);
+ virtual void commit(u_int16_t channel);
+ virtual void rollback(u_int16_t channel);
+ };
+
+
inline virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); }
inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); }
inline virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); }
inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); }
inline virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); }
+ inline virtual TxHandler* getTxHandler(){ return txHandler.get(); }
- inline virtual AccessHandler* getAccessHandler(){ return 0; }
- inline virtual FileHandler* getFileHandler(){ return 0; }
- inline virtual StreamHandler* getStreamHandler(){ return 0; }
- inline virtual TxHandler* getTxHandler(){ return 0; }
- inline virtual DtxHandler* getDtxHandler(){ return 0; }
- inline virtual TunnelHandler* getTunnelHandler(){ return 0; }
+ inline virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); }
+ inline virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); }
+ inline virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); }
+ inline virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); }
+ inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); }
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Mon Oct 30 11:27:54 2006
@@ -135,13 +135,13 @@
}
-void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){
+void TopicExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){
lock.acquire();
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (i->first.match(routingKey)) {
Queue::vector& qv(i->second);
for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
- (*j)->deliver(msg);
+ msg.deliverTo(*j);
}
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Mon Oct 30 11:27:54 2006
@@ -82,7 +82,7 @@
virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
- virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args);
virtual ~TopicExchange();
};
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TransactionalStore_
+#define _TransactionalStore_
+
+namespace qpid {
+ namespace broker {
+ class TransactionalStore{
+ public:
+ virtual void begin() = 0;
+ virtual void commit() = 0;
+ virtual void abort() = 0;
+
+ virtual ~TransactionalStore(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/TxAck.h"
+
+using std::bind1st;
+using std::bind2nd;
+using std::mem_fun_ref;
+using namespace qpid::broker;
+
+TxAck::TxAck(AccumulatedAck _acked, std::list<DeliveryRecord>& _unacked) : acked(_acked), unacked(_unacked){
+
+}
+
+bool TxAck::prepare() throw(){
+ try{
+ //dequeue all acked messages from their queues
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::discardIfCoveredBy), &acked));
+ return true;
+ }catch(...){
+ std::cout << "TxAck::prepare() - Failed to prepare" << std::endl;
+ return false;
+ }
+}
+
+void TxAck::commit() throw(){
+ //remove all acked records from the list
+ unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked));
+}
+
+void TxAck::rollback() throw(){
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TxAck_
+#define _TxAck_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include "qpid/broker/AccumulatedAck.h"
+#include "qpid/broker/DeliveryRecord.h"
+#include "qpid/broker/TxOp.h"
+
+namespace qpid {
+ namespace broker {
+ class TxAck : public TxOp{
+ AccumulatedAck acked;
+ std::list<DeliveryRecord>& unacked;
+ public:
+ TxAck(AccumulatedAck acked, std::list<DeliveryRecord>& unacked);
+ virtual bool prepare() throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~TxAck(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/TxBuffer.h"
+
+using namespace qpid::broker;
+
+bool TxBuffer::prepare(TransactionalStore* const store){
+ if(store) store->begin();
+ for(op_iterator i = ops.begin(); i < ops.end(); i++){
+ if(!(*i)->prepare()){
+ if(store) store->abort();
+ return false;
+ }
+ }
+ if(store) store->commit();
+ return true;
+}
+
+void TxBuffer::commit(){
+ for(op_iterator i = ops.begin(); i < ops.end(); i++){
+ (*i)->commit();
+ }
+}
+
+void TxBuffer::rollback(){
+ for(op_iterator i = ops.begin(); i < ops.end(); i++){
+ (*i)->rollback();
+ }
+}
+
+void TxBuffer::enlist(TxOp* const op){
+ ops.push_back(op);
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
------------------------------------------------------------------------------
svn:eol-style = native