You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jo...@apache.org on 2011/02/08 15:31:19 UTC
svn commit: r1068417 - in /qpid/trunk/qpid/cpp/src:
qpid/broker/SessionAdapter.cpp tests/MessagingSessionTests.cpp
Author: jonathan
Date: Tue Feb 8 14:31:19 2011
New Revision: 1068417
URL: http://svn.apache.org/viewvc?rev=1068417&view=rev
Log:
Reverts r1068042.
We will fix this bug in the C++ client messaging library rather than the broker.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1068417&r1=1068416&r2=1068417&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Tue Feb 8 14:31:19 2011
@@ -60,8 +60,8 @@ SessionAdapter::SessionAdapter(SemanticS
static const std::string _TRUE("true");
static const std::string _FALSE("false");
-void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type,
- const string& alternateExchange,
+void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type,
+ const string& alternateExchange,
bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
AclModule* acl = getBroker().getAcl();
@@ -74,7 +74,7 @@ void SessionAdapter::ExchangeHandlerImpl
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,¶ms) )
throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId()));
}
-
+
//TODO: implement autoDelete
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
@@ -84,7 +84,7 @@ void SessionAdapter::ExchangeHandlerImpl
Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
checkType(actual, type);
checkAlternate(actual, alternate);
- }else{
+ }else{
if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) {
throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")"));
}
@@ -128,10 +128,10 @@ void SessionAdapter::ExchangeHandlerImpl
|| !exchange->getAlternate()))
throw NotAllowedException(QPID_MSG("Exchange declared with alternate-exchange "
<< (exchange->getAlternate() ? exchange->getAlternate()->getName() : "<nonexistent>")
- << ", requested "
+ << ", requested "
<< alternate->getName()));
}
-
+
void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
{
AclModule* acl = getBroker().getAcl();
@@ -164,12 +164,12 @@ ExchangeQueryResult SessionAdapter::Exch
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
} catch (const NotFoundException& /*e*/) {
- return ExchangeQueryResult("", false, true, FieldTable());
+ return ExchangeQueryResult("", false, true, FieldTable());
}
}
-void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
- const string& exchangeName, const string& routingKey,
+void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
+ const string& exchangeName, const string& routingKey,
const FieldTable& arguments)
{
AclModule* acl = getBroker().getAcl();
@@ -201,7 +201,7 @@ void SessionAdapter::ExchangeHandlerImpl
throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
}
}
-
+
void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
const string& exchangeName,
const string& routingKey)
@@ -245,7 +245,7 @@ ExchangeBoundResult SessionAdapter::Exch
if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchangeName,¶ms) )
throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bound request from " << getConnection().getUserId()));
}
-
+
Exchange::shared_ptr exchange;
try {
exchange = getBroker().getExchanges().get(exchangeName);
@@ -296,10 +296,10 @@ void SessionAdapter::QueueHandlerImpl::d
exclusiveQueues.erase(exclusiveQueues.begin());
}
}
-
-bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
-{
- return session.isLocal(t);
+
+bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
+{
+ return session.isLocal(t);
}
@@ -310,15 +310,15 @@ QueueQueryResult SessionAdapter::QueueHa
if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,NULL) )
throw UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << getConnection().getUserId()));
}
-
+
Queue::shared_ptr queue = session.getBroker().getQueues().find(name);
if (queue) {
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
-
- return QueueQueryResult(queue->getName(),
- alternateExchange ? alternateExchange->getName() : "",
- queue->isDurable(),
+
+ return QueueQueryResult(queue->getName(),
+ alternateExchange ? alternateExchange->getName() : "",
+ queue->isDurable(),
queue->hasExclusiveOwner(),
queue->isAutoDelete(),
queue->getSettings(),
@@ -330,9 +330,9 @@ QueueQueryResult SessionAdapter::QueueHa
}
void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange,
- bool passive, bool durable, bool exclusive,
+ bool passive, bool durable, bool exclusive,
bool autoDelete, const qpid::framing::FieldTable& arguments)
-{
+{
AclModule* acl = getBroker().getAcl();
if (acl) {
std::map<acl::Property, std::string> params;
@@ -358,7 +358,7 @@ void SessionAdapter::QueueHandlerImpl::d
queue = getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
- std::pair<Queue::shared_ptr, bool> queue_created =
+ std::pair<Queue::shared_ptr, bool> queue_created =
getBroker().getQueues().declare(name, durable,
autoDelete,
exclusive ? &session : 0);
@@ -395,12 +395,12 @@ void SessionAdapter::QueueHandlerImpl::d
queue_created.second ? "created" : "existing"));
}
- if (exclusive && !queue->isExclusiveOwner(&session))
+ if (exclusive && !queue->isExclusiveOwner(&session))
throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue "
<< queue->getName()));
-}
-
-
+}
+
+
void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
AclModule* acl = getBroker().getAcl();
if (acl)
@@ -409,8 +409,8 @@ void SessionAdapter::QueueHandlerImpl::p
throw UnauthorizedAccessException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId()));
}
getQueue(queue)->purge();
-}
-
+}
+
void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
AclModule* acl = getBroker().getAcl();
@@ -421,7 +421,7 @@ void SessionAdapter::QueueHandlerImpl::d
}
Queue::shared_ptr q = getQueue(queue);
- if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session))
+ if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session))
throw ResourceLockedException(QPID_MSG("Cannot delete queue "
<< queue << "; it is exclusive to another session"));
if(ifEmpty && q->getMessageCount() > 0){
@@ -434,14 +434,6 @@ void SessionAdapter::QueueHandlerImpl::d
QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
}
-
- DeliveryRecords::iterator i;
- for (i=state.getUnacked().begin(); i != state.getUnacked().end(); ++i) {
- if (i->getQueue() == q) {
- i->release(true);
- }
- }
-
q->destroy();
getBroker().getQueues().destroy(queue);
q->unbind(getBroker().getExchanges(), q);
@@ -451,9 +443,9 @@ void SessionAdapter::QueueHandlerImpl::d
agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue));
q->notifyDeleted();
}
-}
+}
-SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
+SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
HandlerHelper(s),
releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)),
releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
@@ -490,7 +482,7 @@ SessionAdapter::MessageHandlerImpl::subs
AclModule* acl = getBroker().getAcl();
if (acl)
- {
+ {
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) )
throw UnauthorizedAccessException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId()));
}
@@ -503,8 +495,8 @@ SessionAdapter::MessageHandlerImpl::subs
throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue "
<< queue->getName()));
- state.consume(destination, queue,
- acceptMode == 0, acquireMode == 0, exclusive,
+ state.consume(destination, queue,
+ acceptMode == 0, acquireMode == 0, exclusive,
resumeId, resumeTtl, arguments);
ManagementAgent* agent = getBroker().getManagementAgent();
@@ -541,9 +533,9 @@ void SessionAdapter::MessageHandlerImpl:
//unknown
throw InvalidArgumentException(QPID_MSG("Invalid value for unit " << unit));
}
-
+
}
-
+
void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destination, uint8_t mode)
{
if (mode == 0) {
@@ -553,18 +545,18 @@ void SessionAdapter::MessageHandlerImpl:
//window
state.setWindowMode(destination);
} else{
- throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode));
+ throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode));
}
}
-
+
void SessionAdapter::MessageHandlerImpl::flush(const std::string& destination)
{
- state.flush(destination);
+ state.flush(destination);
}
void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination)
{
- state.stop(destination);
+ state.stop(destination);
}
void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands)
@@ -592,7 +584,7 @@ framing::MessageResumeResult SessionAdap
{
throw NotImplementedException("resuming transfers not yet supported");
}
-
+
void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op
@@ -627,7 +619,7 @@ void SessionAdapter::TxHandlerImpl::comm
}
void SessionAdapter::TxHandlerImpl::rollback()
-{
+{
state.rollback();
}
@@ -664,7 +656,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
return XaResult(XA_STATUS_XA_OK);
}
} catch (const DtxTimeoutException& /*e*/) {
- return XaResult(XA_STATUS_XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -683,7 +675,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
}
return XaResult(XA_STATUS_XA_OK);
} catch (const DtxTimeoutException& /*e*/) {
- return XaResult(XA_STATUS_XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -693,7 +685,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
bool ok = getBroker().getDtxManager().prepare(convert(xid));
return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
} catch (const DtxTimeoutException& /*e*/) {
- return XaResult(XA_STATUS_XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -704,7 +696,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
} catch (const DtxTimeoutException& /*e*/) {
- return XaResult(XA_STATUS_XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -715,14 +707,14 @@ XaResult SessionAdapter::DtxHandlerImpl:
getBroker().getDtxManager().rollback(convert(xid));
return XaResult(XA_STATUS_XA_OK);
} catch (const DtxTimeoutException& /*e*/) {
- return XaResult(XA_STATUS_XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover()
{
std::set<std::string> xids;
- getBroker().getStore().collectPreparedXids(xids);
+ getBroker().getStore().collectPreparedXids(xids);
/*
* create array of long structs
*/
@@ -743,7 +735,7 @@ void SessionAdapter::DtxHandlerImpl::for
DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid)
{
uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid));
- return DtxGetTimeoutResult(timeout);
+ return DtxGetTimeoutResult(timeout);
}
Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1068417&r1=1068416&r2=1068417&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Tue Feb 8 14:31:19 2011
@@ -404,7 +404,7 @@ struct QueueCreatePolicyFixture : public
~QueueCreatePolicyFixture()
{
- admin.deleteQueue(address.getName());
+ admin.deleteQueue(address.getName());
}
};
@@ -448,7 +448,7 @@ struct ExchangeCreatePolicyFixture : pub
~ExchangeCreatePolicyFixture()
{
- admin.deleteExchange(address.getName());
+ admin.deleteExchange(address.getName());
}
};
@@ -597,7 +597,7 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueu
s1.close();
Receiver r1 = fix.session.createReceiver(a1);
r1.close();
-
+
std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}";
Sender s2 = fix.session.createSender(a2);
s2.close();
@@ -711,7 +711,7 @@ QPID_AUTO_TEST_CASE(testOptionVerificati
{
MessagingFixture fix;
fix.session.createReceiver("my-queue; {create: always, assert: always, delete: always, node: {type: queue, durable: false, x-declare: {arguments: {a: b}}, x-bindings: [{exchange: amq.fanout}]}, link: {name: abc, durable: false, reliability: exactly-once, x-subscribe: {arguments:{a:b}}, x-bindings:[{exchange: amq.fanout}]}, mode: browse}");
- BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);
+ BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);
}
QPID_AUTO_TEST_CASE(testReceiveSpecialProperties)
@@ -775,57 +775,21 @@ QPID_AUTO_TEST_CASE(testExclusiveSubscri
QPID_AUTO_TEST_CASE(testExclusiveQueueSubscriberAndBrowser)
{
MessagingFixture fix;
-
+
std::string address = "exclusive-queue; { create: receiver, node : { x-declare : { auto-delete: true, exclusive: true } } }";
std::string browseAddress = "exclusive-queue; { mode: browse }";
Receiver receiver = fix.session.createReceiver(address);
fix.session.sync();
- Connection c2 = fix.newConnection();
+ Connection c2 = fix.newConnection();
c2.open();
Session s2 = c2.createSession();
-
+
BOOST_CHECK_NO_THROW(Receiver browser = s2.createReceiver(browseAddress));
- c2.close();
+ c2.close();
}
-
-QPID_AUTO_TEST_CASE(testDeleteQueueWithUnackedMessages)
-{
- MessagingFixture fix;
- const uint capacity = 5;
-
- Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}");
- Receiver receiver2 = fix.session.createReceiver("alternate.ex;{create:always,node:{type:topic}}");
- Receiver receiver1 = fix.session.createReceiver("test.q;{create:always, delete:always,node:{type:queue, x-declare:{alternate-exchange:alternate.ex}},link:{x-bindings:[{exchange:test.ex,queue:test.q,key:#}]}}");
-
- receiver1.setCapacity(capacity);
- receiver2.setCapacity(2*capacity);
-
- Message out("test-message");
- for (uint i = 0; i < capacity*2; ++i) {
- sender.send(out);
- }
-
- // Read half the messages, do not acknowledge
- Message in;
- for (uint i = 0; i < capacity; ++i) {
- in = receiver1.fetch(Duration::SECOND * 5);
- BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
- }
-
- receiver1.close();
-
- // Make sure all unacked messages were sent to the alternate
- // exchange when the queue was deleted.
- for (uint i = 0; i < capacity*2; ++i) {
- in = receiver2.fetch(Duration::SECOND * 5);
- BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
- }
-}
-
-
QPID_AUTO_TEST_CASE(testAuthenticatedUsername)
{
MessagingFixture fix;
@@ -864,7 +828,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
messages.push_back(msg);
}
const uint batch(10); //acknowledge first 10 messages only
- for (uint i = 0; i < batch; ++i) {
+ for (uint i = 0; i < batch; ++i) {
other.acknowledge(messages[i]);
}
messages.clear();
@@ -872,7 +836,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
other.close();
other = fix.connection.createSession();
- receiver = other.createReceiver(fix.queue);
+ receiver = other.createReceiver(fix.queue);
for (uint i = 0; i < (count-batch); ++i) {
Message msg = receiver.fetch();
BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str());
@@ -883,7 +847,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
//check unacknowledged messages are still enqueued
other = fix.connection.createSession();
- receiver = other.createReceiver(fix.queue);
+ receiver = other.createReceiver(fix.queue);
for (uint i = 0; i < ((count-batch)/2); ++i) {
Message msg = receiver.fetch();
BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % ((i*2)+1+batch)).str());
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org