You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/02/06 16:01:52 UTC
svn commit: r504172 - in /incubator/qpid/branches/qpid.0-9/cpp: lib/broker/
lib/common/framing/ tests/
Author: aconway
Date: Tue Feb 6 07:01:45 2007
New Revision: 504172
URL: http://svn.apache.org/viewvc?view=rev&rev=504172
Log:
* broker/Reference, tests/ReferenceTest: class representing a reference.
* broker/BrokerChannel.cpp (complete): get destination exchange from Message,
don't assume only one message in progress (could have multiple
references open.)
* broker/BrokerMessageMessage.cpp,.h: Contains transfer body and
vector of append bodies. Construct from Reference.
* broker/CompletionHandler.h: Extracted from BrokerMessage, used for
MessageMessage also.
* broker/ExchangeRegistry.cpp: Moved throw for missing exchanges to
registry.
* cpp/tests/start_broker: Increased wait time to 5 secs.
* cpp/tests/*: renamed DummyChannel as MockChannel.
Added:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/CompletionHandler.h (with props)
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp (with props)
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h (with props)
incubator/qpid/branches/qpid.0-9/cpp/tests/MockChannel.h
- copied, changed from r503923, incubator/qpid/branches/qpid.0-9/cpp/tests/DummyChannel.h
incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp (with props)
Removed:
incubator/qpid/branches/qpid.0-9/cpp/tests/DummyChannel.h
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ExchangeRegistry.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h
incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Tue Feb 6 07:01:45 2007
@@ -355,245 +355,5 @@
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
-
-//
-// Message class method handlers
-//
-void
-BrokerAdapter::MessageHandlerImpl::append( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*bytes*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-
-void
-BrokerAdapter::MessageHandlerImpl::cancel( u_int16_t channel,
- const string& destination )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- connection.getChannel(channel).cancel(destination);
-
- connection.client->getMessageHandler()->ok(channel);
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::close( u_int16_t /*channel*/,
- const string& /*reference*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::consume( u_int16_t channelId,
- u_int16_t /*ticket*/,
- const string& queueName,
- const string& destination,
- bool noLocal,
- bool noAck,
- bool exclusive,
- const qpid::framing::FieldTable& filter )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
- Channel& channel = connection.getChannel(channelId);
- if(!destination.empty() && channel.exists(destination)){
- throw ConnectionException(530, "Consumer tags must be unique");
- }
-
- try{
- string newTag = destination;
- channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
-
- connection.client->getMessageHandler()->ok(channelId);
-
- //allow messages to be dispatched if required as there is now a consumer:
- queue->dispatch();
- }catch(ExclusiveAccessException& e){
- if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
- else throw ChannelException(403, "Access would violate previously granted exclusivity");
- }
-
- connection.getChannel(channel).cancel(destination);
-
- connection.client->getMessageHandler()->ok(channel);
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::get( u_int16_t channelId,
- u_int16_t /*ticket*/,
- const string& queueName,
- const string& /*destination*/,
- bool noAck )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
-
- // FIXME: get is probably Basic specific
- if(!connection.getChannel(channelId).get(queue, !noAck)){
-
- connection.client->getMessageHandler()->empty(channelId);
- }
-
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/,
- u_int64_t /*value*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
- Channel& channel = connection.getChannel(channelId);
- if(!destination.empty() && channel.exists(destination)){
- throw ConnectionException(530, "Consumer tags must be unique");
- }
-
- try{
- string newTag = destination;
- channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
-
- connection.client->getMessageHandler()->ok(channelId);
-
- //allow messages to be dispatched if required as there is now a consumer:
- queue->dispatch();
- }catch(ExclusiveAccessException& e){
- if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
- else throw ChannelException(403, "Access would violate previously granted exclusivity");
- }
-
- connection.getChannel(channel).cancel(destination);
-
- connection.client->getMessageHandler()->ok(channel);
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::open( u_int16_t /*channel*/,
- const string& /*reference*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::qos( u_int16_t channel,
- u_int32_t prefetchSize,
- u_int16_t prefetchCount,
- bool /*global*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- //TODO: handle global
- connection.getChannel(channel).setPrefetchSize(prefetchSize);
- connection.getChannel(channel).setPrefetchCount(prefetchCount);
-
- connection.client->getMessageHandler()->ok(channel);
-
- Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
- Channel& channel = connection.getChannel(channelId);
- if(!destination.empty() && channel.exists(destination)){
- throw ConnectionException(530, "Consumer tags must be unique");
- }
-
- try{
- string newTag = destination;
- channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
-
- connection.client->getMessageHandler()->ok(channelId);
-
- //allow messages to be dispatched if required as there is now a consumer:
- queue->dispatch();
- }catch(ExclusiveAccessException& e){
- if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
- else throw ChannelException(403, "Access would violate previously granted exclusivity");
- }
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::recover( u_int16_t channel,
- bool requeue )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- connection.getChannel(channel).recover(requeue);
-
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::reject( u_int16_t /*channel*/,
- u_int16_t /*code*/,
- const string& /*text*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::resume( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::transfer( u_int16_t channel,
- u_int16_t /*ticket*/,
- const string& /*destination*/,
- bool /*redelivered*/,
- bool immediate,
- u_int64_t /*ttl*/,
- u_int8_t /*priority*/,
- u_int64_t /*timestamp*/,
- u_int8_t /*deliveryMode*/,
- u_int64_t /*expiration*/,
- const string& exchangeName,
- const string& routingKey,
- const string& /*messageId*/,
- const string& /*correlationId*/,
- const string& /*replyTo*/,
- const string& /*contentType*/,
- const string& /*contentEncoding*/,
- const string& /*userId*/,
- const string& /*appId*/,
- const string& /*transactionId*/,
- const string& /*securityToken*/,
- const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content /*body*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- Exchange::shared_ptr exchange = exchangeName.empty() ?
- connection.broker.getExchanges().getDefault() : connection.broker.getExchanges().get(exchangeName);
- if(exchange){
- Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate);
- connection.getChannel(channel).handlePublish(msg, exchange);
- }else{
- throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
- }
-}
-
}} // namespace qpid::broker
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Tue Feb 6 07:01:45 2007
@@ -78,7 +78,7 @@
bool exclusive, ConnectionToken* const connection,
const FieldTable*)
{
- if(tag.empty()) tag = tagGenerator.generate();
+ if(tag.empty()) tag = tagGenerator.generate();
ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
try{
queue->consume(c, exclusive);//may throw exception
@@ -187,6 +187,8 @@
if(blocked) queue->dispatch();
}
+// FIXME aconway 2007-02-05: Drop exchange member, calculate from
+// message in ::complete().
void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){
Message::shared_ptr message(_message);
exchange = _exchange;
@@ -207,19 +209,19 @@
// TODO aconway 2007-01-17: Implement heartbeating.
}
-void Channel::complete(Message::shared_ptr& msg){
- if(exchange){
- if(transactional){
- TxPublish* deliverable = new TxPublish(msg);
- exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
- txBuffer.enlist(new DeletingTxOp(deliverable));
- }else{
- DeliverableMessage deliverable(msg);
- exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
- }
- exchange.reset();
- }else{
- std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl;
+void Channel::complete(Message::shared_ptr msg) {
+ Exchange::shared_ptr exchange =
+ connection.broker.getExchanges().get(msg->getExchange());
+ assert(exchange.get());
+ if(transactional) {
+ std::auto_ptr<TxPublish> deliverable(new TxPublish(msg));
+ exchange->route(*deliverable, msg->getRoutingKey(),
+ &(msg->getHeaderProperties()->getHeaders()));
+ txBuffer.enlist(new DeletingTxOp(deliverable.release()));
+ } else {
+ DeliverableMessage deliverable(msg);
+ exchange->route(deliverable, msg->getRoutingKey(),
+ &(msg->getHeaderProperties()->getHeaders()));
}
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Tue Feb 6 07:01:45 2007
@@ -36,6 +36,7 @@
#include <Prefetch.h>
#include <TxBuffer.h>
#include "framing/ChannelAdapter.h"
+#include "CompletionHandler.h"
namespace qpid {
namespace broker {
@@ -51,9 +52,8 @@
* Maintains state for an AMQP channel. Handles incoming and
* outgoing messages for that channel.
*/
-class Channel :
- public framing::ChannelAdapter,
- private MessageBuilder::CompletionHandler
+class Channel : public framing::ChannelAdapter,
+ public CompletionHandler
{
class ConsumerImpl : public virtual Consumer
{
@@ -96,7 +96,7 @@
boost::scoped_ptr<BrokerAdapter> adapter;
- virtual void complete(Message::shared_ptr& msg);
+ virtual void complete(Message::shared_ptr msg);
void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);
void cancel(consumer_iterator consumer);
bool checkPrefetch(Message::shared_ptr& msg);
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp Tue Feb 6 07:01:45 2007
@@ -18,25 +18,39 @@
* under the License.
*
*/
+#include <iostream>
#include "BrokerMessageMessage.h"
+#include "MessageTransferBody.h"
+#include "MessageAppendBody.h"
+#include "Reference.h"
+using namespace std;
using namespace qpid::broker;
-MessageMessage::MessageMessage(
- const qpid::framing::AMQMethodBody::shared_ptr _methodBody,
- const std::string& _exchange, const std::string& _routingKey,
- bool _mandatory, bool _immediate) :
- Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody),
- methodBody(_methodBody)
-{
-}
+MessageMessage::MessageMessage(TransferPtr transfer_)
+ : Message(transfer_->getExchange(), transfer_->getRoutingKey(),
+ transfer_->getMandatory(), transfer_->getImmediate(),
+ transfer_),
+ transfer(transfer_)
+{}
+
+MessageMessage::MessageMessage(TransferPtr transfer_, const Reference& ref)
+ : Message(transfer_->getExchange(), transfer_->getRoutingKey(),
+ transfer_->getMandatory(), transfer_->getImmediate(),
+ transfer_),
+ transfer(transfer_),
+ appends(ref.getAppends())
+{}
void MessageMessage::deliver(
- framing::ChannelAdapter& /*out*/,
+ framing::ChannelAdapter& /*channel*/,
const std::string& /*consumerTag*/,
u_int64_t /*deliveryTag*/,
u_int32_t /*framesize*/)
{
+ // FIXME aconway 2007-02-05:
+ cout << "MessageMessage::deliver" << *transfer << " + " << appends.size()
+ << " appends." << endl;
}
void MessageMessage::sendGetOk(
@@ -45,49 +59,50 @@
u_int64_t /*deliveryTag*/,
u_int32_t /*framesize*/)
{
+ // FIXME aconway 2007-02-05:
}
bool MessageMessage::isComplete()
{
- return true;
+ return true; // FIXME aconway 2007-02-05:
}
u_int64_t MessageMessage::contentSize() const
{
- return 0;
+ return 0; // FIXME aconway 2007-02-05:
}
qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
{
- return 0;
+ return 0; // FIXME aconway 2007-02-05:
}
bool MessageMessage::isPersistent()
{
- return false;
+ return false; // FIXME aconway 2007-02-05:
}
const ConnectionToken* const MessageMessage::getPublisher()
{
- return 0;
+ return 0; // FIXME aconway 2007-02-05:
}
u_int32_t MessageMessage::encodedSize()
{
- return 0;
+ return 0; // FIXME aconway 2007-02-05:
}
u_int32_t MessageMessage::encodedHeaderSize()
{
- return 0;
+ return 0; // FIXME aconway 2007-02-05:
}
u_int32_t MessageMessage::encodedContentSize()
{
- return 0;
+ return 0; // FIXME aconway 2007-02-05:
}
u_int64_t MessageMessage::expectedContentSize()
{
- return 0;
+ return 0; // FIXME aconway 2007-02-05:
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Tue Feb 6 07:01:45 2007
@@ -21,23 +21,28 @@
* under the License.
*
*/
-
+#include <vector>
#include "BrokerMessageBase.h"
+#include "Reference.h"
namespace qpid {
+
namespace framing {
-class AMQMethodBody;
+class MessageTransferBody;
+class MessageApppendBody;
}
namespace broker {
-class MessageMessage: public Message{
- const qpid::framing::AMQMethodBody::shared_ptr methodBody;
+class Reference;
+class MessageMessage: public Message{
public:
- MessageMessage(
- const framing::AMQMethodBody::shared_ptr methodBody,
- const std::string& exchange, const std::string& routingKey,
- bool mandatory, bool immediate);
+ typedef Reference::TransferPtr TransferPtr;
+ typedef Reference::AppendPtr AppendPtr;
+ typedef Reference::Appends Appends;
+
+ MessageMessage(TransferPtr transfer);
+ MessageMessage(TransferPtr transfer, const Reference&);
// Default destructor okay
@@ -52,7 +57,7 @@
u_int32_t framesize);
bool isComplete();
-
+
u_int64_t contentSize() const;
qpid::framing::BasicHeaderProperties* getHeaderProperties();
bool isPersistent();
@@ -62,10 +67,16 @@
u_int32_t encodedHeaderSize();
u_int32_t encodedContentSize();
u_int64_t expectedContentSize();
+
+ TransferPtr getTransfer() { return transfer; }
+ const Appends& getAppends() { return appends; }
+ private:
+
+ const TransferPtr transfer;
+ const Appends appends;
};
-}
-}
+}}
#endif /*!_broker_BrokerMessage_h*/
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/CompletionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/CompletionHandler.h?view=auto&rev=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/CompletionHandler.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/CompletionHandler.h Tue Feb 6 07:01:45 2007
@@ -0,0 +1,39 @@
+#ifndef _broker_CompletionHandler_h
+#define _broker_CompletionHandler_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Callback interface to handle completion of a message.
+ */
+class CompletionHandler
+{
+ public:
+ virtual ~CompletionHandler(){}
+ virtual void complete(Message::shared_ptr) = 0;
+};
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_CompletionHandler_h*/
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/CompletionHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/CompletionHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ExchangeRegistry.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ExchangeRegistry.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/ExchangeRegistry.cpp Tue Feb 6 07:01:45 2007
@@ -59,7 +59,10 @@
Exchange::shared_ptr ExchangeRegistry::get(const string& name){
Mutex::ScopedLock locker(lock);
- return exchanges[name];
+ Exchange::shared_ptr exchange =exchanges[name];
+ if (!exchange)
+ throw ChannelException(404, "Exchange not found:" + name);
+ return exchange;
}
namespace
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am Tue Feb 6 07:01:45 2007
@@ -69,6 +69,8 @@
QueueRegistry.h \
RecoveryManager.cpp \
RecoveryManager.h \
+ Reference.cpp \
+ Reference.h \
ConnectionFactory.cpp \
ConnectionFactory.h \
Connection.cpp \
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.cpp Tue Feb 6 07:01:45 2007
@@ -27,7 +27,10 @@
using namespace qpid::framing;
using std::auto_ptr;
-MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* const _store, u_int64_t _stagingThreshold) :
+MessageBuilder::MessageBuilder(CompletionHandler* _handler,
+ MessageStore* const _store,
+ u_int64_t _stagingThreshold
+) :
handler(_handler),
store(_store),
stagingThreshold(_stagingThreshold)
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageBuilder.h Tue Feb 6 07:01:45 2007
@@ -29,22 +29,19 @@
#include <AMQContentBody.h>
#include <AMQHeaderBody.h>
#include <BasicPublishBody.h>
+#include "CompletionHandler.h"
namespace qpid {
namespace broker {
class MessageBuilder{
public:
- class CompletionHandler{
- public:
- virtual void complete(Message::shared_ptr&) = 0;
- virtual ~CompletionHandler(){}
- };
MessageBuilder(CompletionHandler* _handler,
MessageStore* const store = 0,
u_int64_t stagingThreshold = 0);
void initialise(Message::shared_ptr& msg);
- void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header);
- void addContent(qpid::framing::AMQContentBody::shared_ptr& content);
+ void setHeader(framing::AMQHeaderBody::shared_ptr& header);
+ void addContent(framing::AMQContentBody::shared_ptr& content);
+ Message::shared_ptr getMessage() { return message; }
private:
Message::shared_ptr message;
CompletionHandler* handler;
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Tue Feb 6 07:01:45 2007
@@ -23,6 +23,8 @@
#include "Connection.h"
#include "Broker.h"
#include "BrokerMessageMessage.h"
+#include "MessageAppendBody.h"
+#include "MessageTransferBody.h"
namespace qpid {
namespace broker {
@@ -33,23 +35,23 @@
// Message class method handlers
//
void
-MessageHandlerImpl::append(const MethodContext&,
- const string& /*reference*/,
+MessageHandlerImpl::append(const MethodContext& context,
+ const string& reference,
const string& /*bytes*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ references.get(reference).append(
+ boost::shared_polymorphic_downcast<MessageAppendBody>(
+ context.methodBody));
+ sendOk(context);
}
void
-MessageHandlerImpl::cancel( const MethodContext& context,
- const string& destination )
+MessageHandlerImpl::cancel(const MethodContext& context,
+ const string& destination )
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
channel.cancel(destination);
-
- connection.client->getMessageHandler()->ok(context);
+ sendOk(context);
}
void
@@ -61,10 +63,11 @@
}
void
-MessageHandlerImpl::close(const MethodContext&,
- const string& /*reference*/ )
+MessageHandlerImpl::close(const MethodContext& context,
+ const string& reference)
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ references.get(reference).close();
+ sendOk(context);
}
void
@@ -88,13 +91,16 @@
string newTag = destination;
channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
- connection.client->getMessageHandler()->ok(context);
+ sendOk(context);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
}catch(ExclusiveAccessException& e){
- if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
- else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ if(exclusive)
+ throw ChannelException(403, "Exclusive access cannot be granted");
+ else
+ throw ChannelException(
+ 403, "Access would violate previously granted exclusivity");
}
}
@@ -133,14 +139,15 @@
void
MessageHandlerImpl::ok( const MethodContext& )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ // TODO aconway 2007-02-05: For HA, we can drop acked messages here.
}
void
-MessageHandlerImpl::open(const MethodContext&,
- const string& /*reference*/ )
+MessageHandlerImpl::open(const MethodContext& context,
+ const string& reference)
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ references.open(reference);
+ sendOk(context);
}
void
@@ -155,7 +162,7 @@
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- connection.client->getMessageHandler()->ok(context);
+ sendOk(context);
}
void
@@ -189,14 +196,14 @@
u_int16_t /*ticket*/,
const string& /*destination*/,
bool /*redelivered*/,
- bool immediate,
+ bool /* immediate */,
u_int64_t /*ttl*/,
u_int8_t /*priority*/,
u_int64_t /*timestamp*/,
u_int8_t /*deliveryMode*/,
u_int64_t /*expiration*/,
const string& exchangeName,
- const string& routingKey,
+ const string& /* routingKey */,
const string& /*messageId*/,
const string& /*correlationId*/,
const string& /*replyTo*/,
@@ -208,27 +215,28 @@
const string& /*securityToken*/,
const qpid::framing::FieldTable& /*applicationHeaders*/,
qpid::framing::Content body,
- bool mandatory )
+ bool /* mandatory */ )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- Exchange::shared_ptr exchange = exchangeName.empty() ?
- broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
- if(exchange){
- if (body.isInline()) {
- MessageMessage* msg =
- new MessageMessage(context.methodBody, exchangeName,
- routingKey, mandatory, immediate);
- channel.handlePublish(msg, exchange);
-
- connection.client->getMessageHandler()->ok(context);
- } else {
- // Don't handle reference content yet
- assert(body.isInline());
- }
- }else{
- throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+ MessageTransferBody::shared_ptr transfer =
+ boost::shared_polymorphic_downcast<MessageTransferBody>(
+ context.methodBody);
+ // Verify the exchange exists, will throw if not.
+ broker.getExchanges().get(exchangeName);
+ if (body.isInline()) {
+ MessageMessage* msg = new MessageMessage(transfer);
+ // FIXME aconway 2007-02-05: Remove exchange parameter.
+ // use shared_ptr for message.
+ channel.handlePublish(msg, Exchange::shared_ptr());
+ sendOk(context);
+ } else {
+ references.get(body.getValue()).transfer(transfer);
}
+}
+
+
+void MessageHandlerImpl::sendOk(const MethodContext& context) {
+ connection.client->getMessageHandler()->ok(context);
}
}} // namespace qpid::broker
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h Tue Feb 6 07:01:45 2007
@@ -19,23 +19,25 @@
*
*/
+#include <memory>
+
#include "AMQP_ServerOperations.h"
+#include "Reference.h"
+#include "BrokerChannel.h"
namespace qpid {
namespace broker {
-class Channel;
class Connection;
class Broker;
+class MessageMessage;
-class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageHandler {
- Channel& channel;
- Connection& connection;
- Broker& broker;
-
+class MessageHandlerImpl :
+ public framing::AMQP_ServerOperations::MessageHandler
+{
public:
MessageHandlerImpl(Channel& ch, Connection& c, Broker& b)
- : channel(ch), connection(c), broker(b) {}
+ : channel(ch), connection(c), broker(b), references(ch) {}
void append(const framing::MethodContext&,
const std::string& reference,
@@ -116,6 +118,13 @@
const framing::FieldTable& applicationHeaders,
framing::Content body,
bool mandatory );
+ private:
+ void sendOk(const framing::MethodContext&);
+
+ Channel& channel;
+ Connection& connection;
+ Broker& broker;
+ ReferenceRegistry references;
};
}} // namespace qpid::broker
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp?view=auto&rev=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp Tue Feb 6 07:01:45 2007
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/bind.hpp>
+#include "Reference.h"
+#include "BrokerMessageMessage.h"
+#include "QpidError.h"
+#include "CompletionHandler.h"
+
+namespace qpid {
+namespace broker {
+
+Reference& ReferenceRegistry::open(const Reference::Id& id) {
+ ReferenceMap::iterator i = references.find(id);
+ // TODO aconway 2007-02-05: should we throw Channel or Connection
+ // exceptions here?
+ if (i != references.end())
+ THROW_QPID_ERROR(CLIENT_ERROR, "Attempt to re-open reference " +id);
+ return references[id] = Reference(id, this);
+}
+
+Reference& ReferenceRegistry::get(const Reference::Id& id) {
+ ReferenceMap::iterator i = references.find(id);
+ if (i == references.end())
+ THROW_QPID_ERROR(
+ CLIENT_ERROR, "Attempt to use non-existent reference "+id);
+ return i->second;
+}
+
+void Reference::close() {
+ for_each(transfers.begin(), transfers.end(),
+ boost::bind(&Reference::complete, this, _1));
+ registry->references.erase(getId());
+}
+
+void Reference::complete(TransferPtr transfer) {
+ MessageMessage::shared_ptr msg(new MessageMessage(transfer, *this));
+ registry->handler.complete(msg);
+}
+
+}} // namespace qpid::broker
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h?view=auto&rev=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h Tue Feb 6 07:01:45 2007
@@ -0,0 +1,111 @@
+#ifndef _broker_Reference_h
+#define _broker_Reference_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <string>
+#include <vector>
+#include <map>
+#include <boost/shared_ptr.hpp>
+#include <boost/range.hpp>
+
+namespace qpid {
+
+namespace framing {
+class MessageTransferBody;
+class MessageAppendBody;
+}
+
+namespace broker {
+
+class CompletionHandler;
+class ReferenceRegistry;
+
+/**
+ * A reference is an accumulation point for data in a multi-frame
+ * message. A reference can be used by multiple transfer commands, so
+ * the reference tracks which commands are using it. When the reference
+ * is closed, all the associated transfers are completed.
+ *
+ * THREAD UNSAFE: per-channel resource, access to channels is
+ * serialized.
+ */
+class Reference
+{
+ public:
+ typedef std::string Id;
+ typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
+ typedef std::vector<TransferPtr> Transfers;
+ typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr;
+ typedef std::vector<AppendPtr> Appends;
+
+ Reference(const Id& id_=Id(), ReferenceRegistry* reg=0)
+ : id(id_), registry(reg) {}
+
+ const std::string& getId() const { return id; }
+
+ /** Add a transfer to be completed with this reference */
+ void transfer(TransferPtr transfer) { transfers.push_back(transfer); }
+
+ /** Append more data to the reference */
+ void append(AppendPtr ptr) { appends.push_back(ptr); }
+
+ /** Close the reference, complete each associated transfer */
+ void close();
+
+ const Appends& getAppends() const { return appends; }
+ const Transfers& getTransfers() const { return transfers; }
+
+ private:
+ void complete(TransferPtr transfer);
+
+ Id id;
+ ReferenceRegistry* registry;
+ Transfers transfers;
+ Appends appends;
+};
+
+
+/**
+ * A registry/factory for references.
+ *
+ * THREAD UNSAFE: per-channel resource, access to channels is
+ * serialized.
+ */
+class ReferenceRegistry {
+ public:
+ ReferenceRegistry(CompletionHandler& handler_) : handler(handler_) {};
+ Reference& open(const Reference::Id& id);
+ Reference& get(const Reference::Id& id);
+
+ private:
+ typedef std::map<Reference::Id, Reference> ReferenceMap;
+ CompletionHandler& handler;
+ ReferenceMap references;
+
+ // Reference calls references.erase() and uses handler.
+ friend class Reference;
+};
+
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_Reference_h*/
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/MethodContext.h Tue Feb 6 07:01:45 2007
@@ -64,7 +64,10 @@
};
// FIXME aconway 2007-02-01: Method context only required on Handler
-// functions, not on Proxy functions.
+// functions, not on Proxy functions. If we add set/getChannel(ChannelAdapter*)
+// on AMQBody and set it during decodeing then we could get rid of the context.
+
+
}} // namespace qpid::framing
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp Tue Feb 6 07:01:45 2007
@@ -28,7 +28,7 @@
#include <memory>
#include <AMQP_HighestVersion.h>
#include "AMQFrame.h"
-#include "DummyChannel.h"
+#include "MockChannel.h"
#include "broker/Connection.h"
#include "ProtocolInitiation.h"
@@ -39,7 +39,7 @@
using std::string;
using std::queue;
-struct DummyHandler : ConnectionOutputHandler{
+struct MockHandler : ConnectionOutputHandler{
std::vector<AMQFrame*> frames;
void send(AMQFrame* frame){ frames.push_back(frame); }
@@ -60,7 +60,7 @@
Broker::shared_ptr broker;
Connection connection;
- DummyHandler handler;
+ MockHandler handler;
class MockMessageStore : public NullMessageStore
{
@@ -240,10 +240,10 @@
Channel channel(
connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/);
const string data[] = {"abcde", "fghij", "klmno"};
-
+
Message* msg = new BasicMessage(
0, "my_exchange", "my_routing_key", false, false,
- DummyChannel::basicGetBody());
+ MockChannel::basicGetBody());
store.expect();
store.stage(msg);
@@ -253,7 +253,8 @@
store.destroy(msg);
store.test();
- Exchange::shared_ptr exchange(new FanOutExchange("my_exchange"));
+ Exchange::shared_ptr exchange =
+ broker->getExchanges().declare("my_exchange", "fanout").first;
Queue::shared_ptr queue(new Queue("my_queue"));
exchange->bind(queue, "", 0);
@@ -333,7 +334,7 @@
{
BasicMessage* msg = new BasicMessage(
0, exchange, routingKey, false, false,
- DummyChannel::basicGetBody());
+ MockChannel::basicGetBody());
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(contentSize);
msg->setHeader(header);
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/InMemoryContentTest.cpp Tue Feb 6 07:01:45 2007
@@ -24,7 +24,7 @@
#include <iostream>
#include <list>
#include "AMQFrame.h"
-#include "DummyChannel.h"
+#include "MockChannel.h"
using std::list;
using std::string;
@@ -58,7 +58,7 @@
void refragment(size_t inCount, string* in, size_t outCount, string* out, u_int32_t framesize = 5)
{
InMemoryContent content;
- DummyChannel channel(3);
+ MockChannel channel(3);
addframes(content, inCount, in);
content.send(channel, framesize);
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/LazyLoadedContentTest.cpp Tue Feb 6 07:01:45 2007
@@ -26,7 +26,7 @@
#include <list>
#include <sstream>
#include "AMQFrame.h"
-#include "DummyChannel.h"
+#include "MockChannel.h"
using std::list;
using std::string;
using boost::dynamic_pointer_cast;
@@ -92,7 +92,7 @@
{
TestMessageStore store(in);
LazyLoadedContent content(&store, 0, in.size());
- DummyChannel channel(3);
+ MockChannel channel(3);
content.send(channel, framesize);
CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size());
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am Tue Feb 6 07:01:45 2007
@@ -9,12 +9,6 @@
-I$(top_srcdir)/lib/common/framing \
$(APR_CXXFLAGS)
-EXTRA_DIST = \
- topictest \
- qpid_test_plugin.h \
- MockConnectionInputHandler.h
-
-
client_exe_tests = \
client_test \
echo_service \
@@ -32,6 +26,7 @@
MessageBuilderTest \
MessageHandlerTest \
MessageTest \
+ ReferenceTest \
QueueRegistryTest \
QueueTest \
QueuePolicyTest \
@@ -69,7 +64,14 @@
CLIENT_TESTS = client_test quick_topictest
TESTS = run-unit-tests start_broker $(CLIENT_TESTS) python_tests kill_broker
-EXTRA_DIST += $(TESTS) topictest
+
+EXTRA_DIST = \
+ $(TESTS) \
+ topictest \
+ qpid_test_plugin.h \
+ MockConnectionInputHandler.h \
+ MockChannel.h \
+ InProcessBroker.h
include gen.mk
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/MessageBuilderTest.cpp Tue Feb 6 07:01:45 2007
@@ -26,7 +26,7 @@
#include <qpid_test_plugin.h>
#include <iostream>
#include <memory>
-#include "DummyChannel.h"
+#include "MockChannel.h"
using namespace boost;
using namespace qpid::broker;
@@ -35,10 +35,10 @@
class MessageBuilderTest : public CppUnit::TestCase
{
- struct DummyHandler : MessageBuilder::CompletionHandler{
+ struct MockHandler : CompletionHandler {
Message::shared_ptr msg;
- virtual void complete(Message::shared_ptr& _msg){
+ virtual void complete(Message::shared_ptr _msg){
msg = _msg;
}
};
@@ -114,13 +114,13 @@
public:
void testHeaderOnly(){
- DummyHandler handler;
+ MockHandler handler;
MessageBuilder builder(&handler);
Message::shared_ptr message(
new BasicMessage(
0, "test", "my_routing_key", false, false,
- DummyChannel::basicGetBody()));
+ MockChannel::basicGetBody()));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(0);
@@ -132,14 +132,14 @@
}
void test1ContentFrame(){
- DummyHandler handler;
+ MockHandler handler;
MessageBuilder builder(&handler);
string data1("abcdefg");
Message::shared_ptr message(
new BasicMessage(0, "test", "my_routing_key", false, false,
- DummyChannel::basicGetBody()));
+ MockChannel::basicGetBody()));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(7);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -154,7 +154,7 @@
}
void test2ContentFrames(){
- DummyHandler handler;
+ MockHandler handler;
MessageBuilder builder(&handler);
string data1("abcdefg");
@@ -162,7 +162,7 @@
Message::shared_ptr message(
new BasicMessage(0, "test", "my_routing_key", false, false,
- DummyChannel::basicGetBody()));
+ MockChannel::basicGetBody()));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -185,7 +185,7 @@
//loaded content is in use)
TestMessageStore store(14);
{
- DummyHandler handler;
+ MockHandler handler;
MessageBuilder builder(&handler, &store, 5);
string data1("abcdefg");
@@ -193,7 +193,7 @@
Message::shared_ptr message(
new BasicMessage(0, "test", "my_routing_key", false, false,
- DummyChannel::basicGetBody()));
+ MockChannel::basicGetBody()));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties());
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/MessageTest.cpp Tue Feb 6 07:01:45 2007
@@ -23,7 +23,7 @@
#include <iostream>
#include <AMQP_HighestVersion.h>
#include "AMQFrame.h"
-#include "DummyChannel.h"
+#include "MockChannel.h"
using namespace boost;
using namespace qpid::broker;
@@ -47,7 +47,7 @@
BasicMessage::shared_ptr msg(
new BasicMessage(0, exchange, routingKey, false, false,
- DummyChannel::basicGetBody()));
+ MockChannel::basicGetBody()));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -73,7 +73,7 @@
CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc"));
CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize());
- DummyChannel channel(1);
+ MockChannel channel(1);
// FIXME aconway 2007-02-02: deliver should take const ProtocolVersion&
msg->deliver(channel, "ignore", 0, 100);
CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size());
Copied: incubator/qpid/branches/qpid.0-9/cpp/tests/MockChannel.h (from r503923, incubator/qpid/branches/qpid.0-9/cpp/tests/DummyChannel.h)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/MockChannel.h?view=diff&rev=504172&p1=incubator/qpid/branches/qpid.0-9/cpp/tests/DummyChannel.h&r1=503923&p2=incubator/qpid/branches/qpid.0-9/cpp/tests/MockChannel.h&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/DummyChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/MockChannel.h Tue Feb 6 07:01:45 2007
@@ -1,5 +1,5 @@
-#ifndef _tests_DummyChannel_h
-#define _tests_DummyChannel_h
+#ifndef _tests_MockChannel_h
+#define _tests_MockChannel_h
/*
*
@@ -26,16 +26,16 @@
#include "framing/AMQFrame.h"
#include "BasicGetBody.h"
-/** Dummy output handler to collect frames */
-struct DummyOutputHandler : public qpid::framing::OutputHandler {
+/** Mock output handler to collect frames */
+struct MockOutputHandler : public qpid::framing::OutputHandler {
std::vector<qpid::framing::AMQFrame*> frames;
void send(qpid::framing::AMQFrame* frame){ frames.push_back(frame); }
};
/**
- * Combination dummy OutputHandler and ChannelAdapter for tests.
+ * Combination mock OutputHandler and ChannelAdapter for tests.
*/
-struct DummyChannel : public qpid::framing::ChannelAdapter
+struct MockChannel : public qpid::framing::ChannelAdapter
{
typedef qpid::framing::BasicGetBody Body;
static Body::shared_ptr basicGetBody() {
@@ -43,9 +43,9 @@
new Body(qpid::framing::ProtocolVersion()));
}
- DummyOutputHandler out;
+ MockOutputHandler out;
- DummyChannel(qpid::framing::ChannelId id) {
+ MockChannel(qpid::framing::ChannelId id) {
init(id, out, qpid::framing::ProtocolVersion());
}
@@ -66,4 +66,4 @@
};
-#endif /*!_tests_DummyChannel_h*/
+#endif /*!_tests_MockChannel_h*/
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/QueueTest.cpp Tue Feb 6 07:01:45 2007
@@ -22,7 +22,7 @@
#include <QueueRegistry.h>
#include <qpid_test_plugin.h>
#include <iostream>
-#include "DummyChannel.h"
+#include "MockChannel.h"
using namespace qpid::broker;
using namespace qpid::sys;
@@ -58,7 +58,7 @@
Message::shared_ptr message(std::string exchange, std::string routingKey) {
return Message::shared_ptr(
new BasicMessage(0, exchange, routingKey, true, true,
- DummyChannel::basicGetBody()));
+ MockChannel::basicGetBody()));
}
void testConsumers(){
Added: incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp?view=auto&rev=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp Tue Feb 6 07:01:45 2007
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <memory>
+#include "qpid_test_plugin.h"
+#include "Reference.h"
+#include "BrokerMessageMessage.h"
+#include "MessageTransferBody.h"
+#include "MessageAppendBody.h"
+#include "CompletionHandler.h"
+
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace std;
+
+class ReferenceTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(ReferenceTest);
+ CPPUNIT_TEST(testRegistry);
+ CPPUNIT_TEST(testReference);
+ CPPUNIT_TEST_SUITE_END();
+
+
+ struct MockCompletionHandler : public CompletionHandler {
+ std::vector<Message::shared_ptr> messages;
+ void complete(Message::shared_ptr msg) { messages.push_back(msg); }
+ };
+
+ MockCompletionHandler handler;
+ ProtocolVersion v;
+ ReferenceRegistry registry;
+ MessageTransferBody::shared_ptr t1, t2;
+ MessageAppendBody::shared_ptr a1, a2;
+ public:
+
+ ReferenceTest() :
+ registry(handler),
+ t1(new MessageTransferBody(v)),
+ t2(new MessageTransferBody(v)),
+ a1(new MessageAppendBody(v)),
+ a2(new MessageAppendBody(v))
+ {}
+
+ void testRegistry() {
+ Reference& ref = registry.open("foo");
+ CPPUNIT_ASSERT_EQUAL(string("foo"), ref.getId());
+ CPPUNIT_ASSERT(&ref == ®istry.get("foo"));
+ try {
+ registry.get("none");
+ CPPUNIT_FAIL("Expected exception");
+ } catch (...) {}
+ try {
+ registry.open("foo");
+ CPPUNIT_FAIL("Expected exception");
+ } catch(...) {}
+ }
+
+ MessageMessage& handlerMessage(size_t i) {
+ CPPUNIT_ASSERT(handler.messages.size() > i);
+ MessageMessage* msg = dynamic_cast<MessageMessage*>(
+ handler.messages[i].get());
+ CPPUNIT_ASSERT(msg);
+ return *msg;
+ }
+
+ void testReference() {
+ Reference& ref = registry.open("foo");
+ ref.transfer(t1);
+ ref.transfer(t2);
+ CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getTransfers().size());
+ ref.append(a1);
+ ref.append(a2);
+ CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getAppends().size());
+ ref.close();
+ try {
+ registry.open("foo");
+ CPPUNIT_FAIL("Expected exception");
+ } catch(...) {}
+
+ vector<Message::shared_ptr>& messages = handler.messages;
+ CPPUNIT_ASSERT_EQUAL(size_t(2), messages.size());
+
+ CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getTransfer(), t1);
+ CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getAppends()[0], a1);
+ CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getAppends()[1], a2);
+
+ CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getTransfer(), t2);
+ CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getAppends()[0], a1);
+ CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getAppends()[1], a2);
+ }
+
+
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(ReferenceTest);
Propchange: incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/TxAckTest.cpp Tue Feb 6 07:01:45 2007
@@ -25,7 +25,7 @@
#include <iostream>
#include <list>
#include <vector>
-#include "DummyChannel.h"
+#include "MockChannel.h"
using std::list;
using std::vector;
@@ -72,7 +72,7 @@
for(int i = 0; i < 10; i++){
Message::shared_ptr msg(
new BasicMessage(0, "exchange", "routing_key", false, false,
- DummyChannel::basicGetBody()));
+ MockChannel::basicGetBody()));
msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
messages.push_back(msg);
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/TxPublishTest.cpp Tue Feb 6 07:01:45 2007
@@ -25,7 +25,7 @@
#include <iostream>
#include <list>
#include <vector>
-#include "DummyChannel.h"
+#include "MockChannel.h"
using std::list;
using std::pair;
@@ -78,7 +78,7 @@
queue1(new Queue("queue1", false, &store, 0)),
queue2(new Queue("queue2", false, &store, 0)),
msg(new BasicMessage(0, "exchange", "routing_key", false, false,
- DummyChannel::basicGetBody())),
+ MockChannel::basicGetBody())),
op(msg, &xid)
{
msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker?view=diff&rev=504172&r1=504171&r2=504172
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/start_broker Tue Feb 6 07:01:45 2007
@@ -11,4 +11,4 @@
# FIXME aconway 2007-01-18: qpidd should not return till it is accepting
# connections, remove arbitrary sleep.
-sleep 2
+sleep 5