You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/22 00:05:07 UTC
svn commit: r1534394 [3/22] - in /qpid/branches/linearstore/qpid: ./ cpp/
cpp/bindings/qmf2/examples/python/ cpp/bindings/qmf2/python/
cpp/bindings/qpid/dotnet/ cpp/etc/ cpp/examples/ cpp/examples/messaging/
cpp/examples/qmf-agent/ cpp/include/qpid/ cp...
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.cpp Mon Oct 21 22:04:51 2013
@@ -2122,19 +2122,21 @@ bool ManagementAgent::authorizeAgentMess
string methodName;
string cid;
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
//
- // If the message is larger than our working buffer size, we can't determine if it's
- // authorized or not. In this case, return true (authorized) if there is no ACL in place,
- // otherwise return false;
+ // If the message is larger than our working buffer size (or if it
+ // could not be converted to an 0-10 messgae-transfer), we can't
+ // determine if it's authorized or not. In this case, return true
+ // (authorized) if there is no ACL in place, otherwise return
+ // false;
//
- if (msg.getContentSize() > qmfV1BufferSize)
+ if (!transfer || transfer->getContentSize() > qmfV1BufferSize)
return broker->getAcl() == 0;
- inBuffer.putRawData(msg.getContent());
+ inBuffer.putRawData(transfer->getContent());
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
- boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
const framing::MessageProperties* p =
transfer ? transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
@@ -2283,13 +2285,13 @@ void ManagementAgent::dispatchAgentComma
ResizableBuffer inBuffer(qmfV1BufferSize);
uint8_t opcode;
- if (msg.getContentSize() > qmfV1BufferSize) {
+ if (transfer->getContentSize() > qmfV1BufferSize) {
QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
- msg.getContentSize());
+ transfer->getContentSize());
return;
}
- inBuffer.putRawData(msg.getContent());
+ inBuffer.putRawData(transfer->getContent());
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:r1525057-1534385
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Connection.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Connection.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Connection.cpp Mon Oct 21 22:04:51 2013
@@ -31,6 +31,9 @@
namespace qpid {
namespace messaging {
+// Explicitly instantiate Handle superclass
+template class Handle<ConnectionImpl>;
+
using namespace qpid::types;
typedef PrivateImplRef<qpid::messaging::Connection> PI;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp Mon Oct 21 22:04:51 2013
@@ -19,7 +19,7 @@
*
*/
#include "ProtocolRegistry.h"
-#include "qpid/Exception.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/client/amqp0_10/ConnectionImpl.h"
#include "qpid/client/LoadPlugins.h"
#include <map>
@@ -61,7 +61,7 @@ ConnectionImpl* ProtocolRegistry::create
Registry::const_iterator i = theRegistry().find(name.asString());
if (i != theRegistry().end()) return (i->second)(url, stripped);
else if (name.asString() == "amqp0-10") return new qpid::client::amqp0_10::ConnectionImpl(url, stripped);
- else throw qpid::Exception("Unsupported protocol: " + name.asString());
+ else throw MessagingException("Unsupported protocol: " + name.asString());
}
return 0;
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Receiver.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Receiver.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Receiver.cpp Mon Oct 21 22:04:51 2013
@@ -29,6 +29,9 @@
namespace qpid {
namespace messaging {
+// Explicitly instantiate Handle superclass
+template class Handle<ReceiverImpl>;
+
typedef PrivateImplRef<qpid::messaging::Receiver> PI;
Receiver::Receiver(ReceiverImpl* impl) { PI::ctor(*this, impl); }
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Sender.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Sender.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Sender.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Sender.cpp Mon Oct 21 22:04:51 2013
@@ -27,6 +27,10 @@
namespace qpid {
namespace messaging {
+
+// Explicitly instantiate Handle superclass
+template class Handle<SenderImpl>;
+
typedef PrivateImplRef<qpid::messaging::Sender> PI;
Sender::Sender(SenderImpl* impl) { PI::ctor(*this, impl); }
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Session.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Session.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Session.cpp Mon Oct 21 22:04:51 2013
@@ -30,6 +30,9 @@
namespace qpid {
namespace messaging {
+// Explicitly instantiate Handle superclass
+template class Handle<SessionImpl>;
+
typedef PrivateImplRef<qpid::messaging::Session> PI;
Session::Session(SessionImpl* impl) { PI::ctor(*this, impl); }
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Mon Oct 21 22:04:51 2013
@@ -22,6 +22,7 @@
#include "qpid/messaging/Address.h"
#include "qpid/messaging/AddressImpl.h"
#include "qpid/amqp/descriptors.h"
+#include "qpid/types/encodings.h"
#include "qpid/log/Statement.h"
#include <vector>
#include <set>
@@ -290,6 +291,127 @@ void write(pn_data_t* data, const Varian
break;
}
}
+bool read(pn_data_t* data, pn_type_t type, qpid::types::Variant& value);
+bool read(pn_data_t* data, qpid::types::Variant& value)
+{
+ return read(data, pn_data_type(data), value);
+}
+void readList(pn_data_t* data, qpid::types::Variant::List& value)
+{
+ size_t count = pn_data_get_list(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ qpid::types::Variant e;
+ if (read(data, e)) value.push_back(e);
+ }
+ pn_data_exit(data);
+}
+void readMap(pn_data_t* data, qpid::types::Variant::Map& value)
+{
+ size_t count = pn_data_get_list(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) {
+ std::string key = convert(pn_data_get_symbol(data));
+ pn_data_next(data);
+ qpid::types::Variant e;
+ if (read(data, e)) value[key]= e;
+ }
+ pn_data_exit(data);
+}
+void readArray(pn_data_t* data, qpid::types::Variant::List& value)
+{
+ size_t count = pn_data_get_array(data);
+ pn_type_t type = pn_data_get_array_type(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ qpid::types::Variant e;
+ if (read(data, type, e)) value.push_back(e);
+ }
+ pn_data_exit(data);
+}
+bool read(pn_data_t* data, pn_type_t type, qpid::types::Variant& value)
+{
+ switch (type) {
+ case PN_NULL:
+ if (value.getType() != qpid::types::VAR_VOID) value = qpid::types::Variant();
+ return true;
+ case PN_BOOL:
+ value = pn_data_get_bool(data);
+ return true;
+ case PN_UBYTE:
+ value = pn_data_get_ubyte(data);
+ return true;
+ case PN_BYTE:
+ value = pn_data_get_byte(data);
+ return true;
+ case PN_USHORT:
+ value = pn_data_get_ushort(data);
+ return true;
+ case PN_SHORT:
+ value = pn_data_get_short(data);
+ return true;
+ case PN_UINT:
+ value = pn_data_get_uint(data);
+ return true;
+ case PN_INT:
+ value = pn_data_get_int(data);
+ return true;
+ case PN_CHAR:
+ value = pn_data_get_char(data);
+ return true;
+ case PN_ULONG:
+ value = pn_data_get_ulong(data);
+ return true;
+ case PN_LONG:
+ value = pn_data_get_long(data);
+ return true;
+ case PN_TIMESTAMP:
+ value = pn_data_get_timestamp(data);
+ return true;
+ case PN_FLOAT:
+ value = pn_data_get_float(data);
+ return true;
+ case PN_DOUBLE:
+ value = pn_data_get_double(data);
+ return true;
+ case PN_UUID:
+ value = qpid::types::Uuid(pn_data_get_uuid(data).bytes);
+ return true;
+ case PN_BINARY:
+ value = convert(pn_data_get_binary(data));
+ value.setEncoding(qpid::types::encodings::BINARY);
+ return true;
+ case PN_STRING:
+ value = convert(pn_data_get_string(data));
+ value.setEncoding(qpid::types::encodings::UTF8);
+ return true;
+ case PN_SYMBOL:
+ value = convert(pn_data_get_string(data));
+ value.setEncoding(qpid::types::encodings::ASCII);
+ return true;
+ case PN_LIST:
+ value = qpid::types::Variant::List();
+ readList(data, value.asList());
+ return true;
+ break;
+ case PN_MAP:
+ value = qpid::types::Variant::Map();
+ readMap(data, value.asMap());
+ return true;
+ case PN_ARRAY:
+ value = qpid::types::Variant::List();
+ readArray(data, value.asList());
+ return true;
+ case PN_DESCRIBED:
+ case PN_DECIMAL32:
+ case PN_DECIMAL64:
+ case PN_DECIMAL128:
+ default:
+ return false;
+ }
+
+}
+
const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes
const uint32_t DEFAULT_TIMEOUT(0);
}
@@ -354,7 +476,7 @@ AddressHelper::AddressHelper(const Addre
properties[LIFETIME_POLICY] = DELETE_ON_CLOSE;
}
- if (properties.size() && !(isTemporary || createPolicy.size())) {
+ if (properties.size() && !(isTemporary || !createPolicy.empty() || !assertPolicy.empty())) {
QPID_LOG(warning, "Properties will be ignored! " << address);
}
@@ -420,10 +542,48 @@ void AddressHelper::addFilter(const std:
filters.push_back(Filter(name, descriptor, value));
}
+namespace {
+bool checkLifetimePolicy(const std::string& requested, const std::string& actual)
+{
+ if (actual == qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL && requested == DELETE_ON_CLOSE) return true;
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL && requested == DELETE_IF_UNUSED) return true;
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL && requested == DELETE_IF_EMPTY) return true;
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL && requested == DELETE_IF_UNUSED_AND_EMPTY) return true;
+ else return actual == requested;
+}
+bool checkLifetimePolicy(const std::string& requested, uint64_t actual)
+{
+ if (actual == qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_CODE)
+ return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL);
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_CODE)
+ return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL);
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_CODE)
+ return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL);
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE)
+ return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL);
+ else
+ return false;
+}
+bool checkLifetimePolicy(const std::string& requested, pn_data_t* actual)
+{
+ bool result(false);
+ if (pn_data_is_described(actual)) {
+ pn_data_enter(actual);
+ pn_data_next(actual);
+ if (pn_data_type(actual) == PN_ULONG) {
+ result = checkLifetimePolicy(requested, pn_data_get_ulong(actual));
+ } else if (pn_data_type(actual) == PN_SYMBOL) {
+ result = checkLifetimePolicy(requested, convert(pn_data_get_symbol(actual)));
+ }
+ pn_data_exit(actual);
+ }
+ return result;
+}
+}
void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode)
{
if (assertEnabled(mode)) {
- QPID_LOG(debug, "checking assertions: " << capabilities);
+ QPID_LOG(debug, "checking capabilities: " << capabilities);
//ensure all desired capabilities have been offered
std::set<std::string> desired;
for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
@@ -493,6 +653,40 @@ void AddressHelper::checkAssertion(pn_te
}
}
if (!first) throw qpid::messaging::AssertionFailed(missing.str());
+
+ //assert on properties (Note: this violates the AMQP 1.0
+ //specification - as does the create option - by sending
+ //node-properties even if the dynamic option is not
+ //set. However this can be avoided by not specifying any node
+ //properties when asserting)
+ if (!type.empty() || durableNode || !properties.empty()) {
+ qpid::types::Variant::Map requested = properties;
+ if (!type.empty()) requested[SUPPORTED_DIST_MODES] = type == TOPIC ? COPY : MOVE;
+ if (durableNode) requested[DURABLE] = true;
+
+ data = pn_terminus_properties(terminus);
+ if (pn_data_next(data)) {
+ size_t count = pn_data_get_map(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ std::string key = convert(pn_data_get_symbol(data));
+ pn_data_next(data);
+ qpid::types::Variant::Map::const_iterator j = requested.find(key);
+ qpid::types::Variant v;
+ if (j != requested.end() &&
+ ((key == LIFETIME_POLICY && checkLifetimePolicy(j->second.asString(), data)) ||
+ (read(data, v) && v.asString() == j->second.asString()))) {
+ requested.erase(j->first);
+ }
+ }
+ pn_data_exit(data);
+ if (!requested.empty()) {
+ std::stringstream missing;
+ missing << "Requested node properties not met: " << requested;
+ throw qpid::messaging::AssertionFailed(missing.str());
+ }
+ }
+ }
}
}
@@ -582,6 +776,8 @@ void AddressHelper::configure(pn_link_t*
//application expects name of node to be as specified
setNodeProperties(terminus);
createOnDemand = true;
+ } else if (assertEnabled(mode)) {
+ setNodeProperties(terminus);
}
}
@@ -616,6 +812,13 @@ void AddressHelper::configure(pn_link_t*
}
if (isUnreliable()) {
pn_link_set_snd_settle_mode(link, PN_SND_SETTLED);
+ } else if (!reliability.empty()) {
+ if (reliability == EXACTLY_ONCE ) {
+ QPID_LOG(warning, "Unsupported reliability mode: " << reliability);
+ } else if (reliability != AT_LEAST_ONCE ) {
+ QPID_LOG(warning, "Unrecognised reliability mode: " << reliability);
+ }
+ pn_link_set_snd_settle_mode(link, PN_SND_UNSETTLED);
}
}
@@ -688,7 +891,7 @@ void AddressHelper::setNodeProperties(pn
putLifetimePolicy(data, toLifetimePolicy(i->second.asString()));
} else {
pn_data_put_symbol(data, convert(i->first));
- pn_data_put_string(data, convert(i->second.asString()));
+ write(data, i->second);
}
}
pn_data_exit(data);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Mon Oct 21 22:04:51 2013
@@ -234,6 +234,14 @@ void ConnectionContext::acknowledge(boos
wakeupDriver();
}
+void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn);
+ ssn->nack(MessageImplAccess::get(message).getInternalId(), reject);
+ wakeupDriver();
+}
+
void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
@@ -468,6 +476,15 @@ void ConnectionContext::checkClosed(boos
}
}
+bool ConnectionContext::isClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
+{
+ try {
+ checkClosed(ssn, lnk->receiver);
+ return false;
+ } catch (const LinkError&) {
+ return true;
+ }
+}
void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
{
checkClosed(ssn, lnk->receiver);
@@ -584,7 +601,7 @@ std::size_t ConnectionContext::decodePla
lock.notifyAll();
return n;
} else if (n == PN_ERR) {
- throw qpid::Exception(QPID_MSG("Error on input: " << getError()));
+ throw MessagingException(QPID_MSG("Error on input: " << getError()));
} else {
return 0;
}
@@ -608,7 +625,7 @@ std::size_t ConnectionContext::encodePla
haveOutput = true;
return n;
} else if (n == PN_ERR) {
- throw qpid::Exception(QPID_MSG("Error on output: " << getError()));
+ throw MessagingException(QPID_MSG("Error on output: " << getError()));
} else if (n == PN_EOS) {
haveOutput = false;
return 0;//Is this right?
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Mon Oct 21 22:04:51 2013
@@ -77,10 +77,12 @@ class ConnectionContext : public qpid::s
void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
+ bool isClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync);
bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
+ void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject);
void setOption(const std::string& name, const qpid::types::Variant& value);
std::string getAuthenticatedUsername();
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp Mon Oct 21 22:04:51 2013
@@ -20,7 +20,9 @@
*/
#include "qpid/messaging/amqp/EncodedMessage.h"
#include "qpid/messaging/Address.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/MessageImpl.h"
+#include "qpid/Exception.h"
#include "qpid/amqp/Decoder.h"
#include "qpid/amqp/DataBuilder.h"
#include "qpid/amqp/ListBuilder.h"
@@ -100,66 +102,73 @@ const char* EncodedMessage::getData() co
void EncodedMessage::init(qpid::messaging::MessageImpl& impl)
{
- //initial scan of raw data
- qpid::amqp::Decoder decoder(data, size);
- InitialScan reader(*this, impl);
- decoder.read(reader);
- bareMessage = reader.getBareMessage();
- if (bareMessage.data && !bareMessage.size) {
- bareMessage.size = (data + size) - bareMessage.data;
+ try {
+ //initial scan of raw data
+ qpid::amqp::Decoder decoder(data, size);
+ InitialScan reader(*this, impl);
+ decoder.read(reader);
+ bareMessage = reader.getBareMessage();
+ if (bareMessage.data && !bareMessage.size) {
+ bareMessage.size = (data + size) - bareMessage.data;
+ }
+ } catch (const qpid::Exception& e) {
+ throw FetchError(e.what());
}
-
}
void EncodedMessage::setNestAnnotationsOption(bool b) { nestAnnotations = b; }
void EncodedMessage::populate(qpid::types::Variant::Map& map) const
{
- //decode application properties
- if (applicationProperties) {
- qpid::amqp::Decoder decoder(applicationProperties.data, applicationProperties.size);
- decoder.readMap(map);
- }
- //add in 'x-amqp-' prefixed values
- if (!!firstAcquirer) {
- map["x-amqp-first-acquirer"] = firstAcquirer.get();
- }
- if (!!deliveryCount) {
- map["x-amqp-delivery-count"] = deliveryCount.get();
- }
- if (to) {
- map["x-amqp-to"] = to.str();
- }
- if (!!absoluteExpiryTime) {
- map["x-amqp-absolute-expiry-time"] = absoluteExpiryTime.get();
- }
- if (!!creationTime) {
- map["x-amqp-creation-time"] = creationTime.get();
- }
- if (groupId) {
- map["x-amqp-group-id"] = groupId.str();
- }
- if (!!groupSequence) {
- map["x-amqp-qroup-sequence"] = groupSequence.get();
- }
- if (replyToGroupId) {
- map["x-amqp-reply-to-group-id"] = replyToGroupId.str();
- }
- //add in any annotations
- if (deliveryAnnotations) {
- qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size);
- if (nestAnnotations) {
- map["x-amqp-delivery-annotations"] = decoder.readMap();
- } else {
+ try {
+ //decode application properties
+ if (applicationProperties) {
+ qpid::amqp::Decoder decoder(applicationProperties.data, applicationProperties.size);
decoder.readMap(map);
}
- }
- if (messageAnnotations) {
- qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size);
- if (nestAnnotations) {
- map["x-amqp-message-annotations"] = decoder.readMap();
- } else {
- decoder.readMap(map);
+ //add in 'x-amqp-' prefixed values
+ if (!!firstAcquirer) {
+ map["x-amqp-first-acquirer"] = firstAcquirer.get();
+ }
+ if (!!deliveryCount) {
+ map["x-amqp-delivery-count"] = deliveryCount.get();
+ }
+ if (to) {
+ map["x-amqp-to"] = to.str();
+ }
+ if (!!absoluteExpiryTime) {
+ map["x-amqp-absolute-expiry-time"] = absoluteExpiryTime.get();
+ }
+ if (!!creationTime) {
+ map["x-amqp-creation-time"] = creationTime.get();
+ }
+ if (groupId) {
+ map["x-amqp-group-id"] = groupId.str();
+ }
+ if (!!groupSequence) {
+ map["x-amqp-qroup-sequence"] = groupSequence.get();
}
+ if (replyToGroupId) {
+ map["x-amqp-reply-to-group-id"] = replyToGroupId.str();
+ }
+ //add in any annotations
+ if (deliveryAnnotations) {
+ qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size);
+ if (nestAnnotations) {
+ map["x-amqp-delivery-annotations"] = decoder.readMap();
+ } else {
+ decoder.readMap(map);
+ }
+ }
+ if (messageAnnotations) {
+ qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size);
+ if (nestAnnotations) {
+ map["x-amqp-message-annotations"] = decoder.readMap();
+ } else {
+ decoder.readMap(map);
+ }
+ }
+ } catch (const qpid::Exception& e) {
+ throw FetchError(e.what());
}
}
qpid::amqp::CharSequence EncodedMessage::getBareMessage() const
@@ -169,7 +178,15 @@ qpid::amqp::CharSequence EncodedMessage:
void EncodedMessage::getReplyTo(qpid::messaging::Address& a) const
{
- a = qpid::messaging::Address(replyTo.str());
+ std::string rt = replyTo.str();
+ std::string::size_type i = rt.find('/');
+ if (i != std::string::npos && i > 0 && rt.find('/', i+1) == std::string::npos) {
+ //handle <name>/<subject> special case
+ a.setName(rt.substr(0, i));
+ a.setSubject(rt.substr(i+1));
+ } else {
+ a.setName(rt);
+ }
}
void EncodedMessage::getSubject(std::string& s) const
{
@@ -193,35 +210,39 @@ void EncodedMessage::getCorrelationId(st
}
void EncodedMessage::getBody(std::string& raw, qpid::types::Variant& c) const
{
- if (!content.isVoid()) {
- c = content;//integer types, floats, bool etc
- //TODO: populate raw data?
- } else {
- if (bodyType.empty()
- || bodyType == qpid::amqp::typecodes::BINARY_NAME
- || bodyType == qpid::types::encodings::UTF8
- || bodyType == qpid::types::encodings::ASCII)
- {
- c = std::string(body.data, body.size);
- c.setEncoding(bodyType);
- } else if (bodyType == qpid::amqp::typecodes::LIST_NAME) {
- qpid::amqp::ListBuilder builder;
- qpid::amqp::Decoder decoder(body.data, body.size);
- decoder.read(builder);
- c = builder.getList();
- raw.assign(body.data, body.size);
- } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) {
- qpid::amqp::DataBuilder builder = qpid::amqp::DataBuilder(qpid::types::Variant::Map());
- qpid::amqp::Decoder decoder(body.data, body.size);
- decoder.read(builder);
- c = builder.getValue().asMap();
- raw.assign(body.data, body.size);
- } else if (bodyType == qpid::amqp::typecodes::UUID_NAME) {
- if (body.size == qpid::types::Uuid::SIZE) c = qpid::types::Uuid(body.data);
- raw.assign(body.data, body.size);
- } else if (bodyType == qpid::amqp::typecodes::ARRAY_NAME) {
- raw.assign(body.data, body.size);
+ try {
+ if (!content.isVoid()) {
+ c = content;//integer types, floats, bool etc
+ //TODO: populate raw data?
+ } else {
+ if (bodyType.empty()
+ || bodyType == qpid::amqp::typecodes::BINARY_NAME
+ || bodyType == qpid::types::encodings::UTF8
+ || bodyType == qpid::types::encodings::ASCII)
+ {
+ c = std::string(body.data, body.size);
+ c.setEncoding(bodyType);
+ } else if (bodyType == qpid::amqp::typecodes::LIST_NAME) {
+ qpid::amqp::ListBuilder builder;
+ qpid::amqp::Decoder decoder(body.data, body.size);
+ decoder.read(builder);
+ c = builder.getList();
+ raw.assign(body.data, body.size);
+ } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) {
+ qpid::amqp::DataBuilder builder = qpid::amqp::DataBuilder(qpid::types::Variant::Map());
+ qpid::amqp::Decoder decoder(body.data, body.size);
+ decoder.read(builder);
+ c = builder.getValue().asMap();
+ raw.assign(body.data, body.size);
+ } else if (bodyType == qpid::amqp::typecodes::UUID_NAME) {
+ if (body.size == qpid::types::Uuid::SIZE) c = qpid::types::Uuid(body.data);
+ raw.assign(body.data, body.size);
+ } else if (bodyType == qpid::amqp::typecodes::ARRAY_NAME) {
+ raw.assign(body.data, body.size);
+ }
}
+ } catch (const qpid::Exception& e) {
+ throw FetchError(e.what());
}
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Mon Oct 21 22:04:51 2013
@@ -123,11 +123,6 @@ Address ReceiverContext::getAddress() co
return address;
}
-bool ReceiverContext::isClosed() const
-{
- return false;//TODO
-}
-
void ReceiverContext::reset(pn_session_t* session)
{
receiver = pn_receiver(session, name.c_str());
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h Mon Oct 21 22:04:51 2013
@@ -55,7 +55,6 @@ class ReceiverContext
void close();
const std::string& getName() const;
const std::string& getSource() const;
- bool isClosed() const;
void configure();
void verify();
Address getAddress() const;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp Mon Oct 21 22:04:51 2013
@@ -100,7 +100,7 @@ qpid::messaging::Session ReceiverHandle:
bool ReceiverHandle::isClosed() const
{
- return receiver->isClosed();
+ return connection->isClosed(session, receiver);
}
Address ReceiverHandle::getAddress() const
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Mon Oct 21 22:04:51 2013
@@ -21,6 +21,8 @@
#include "qpid/messaging/amqp/SenderContext.h"
#include "qpid/messaging/amqp/EncodedMessage.h"
#include "qpid/messaging/AddressImpl.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/Exception.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/amqp/MapHandler.h"
#include "qpid/amqp/MessageEncoder.h"
@@ -42,7 +44,7 @@ SenderContext::SenderContext(pn_session_
: name(n),
address(a),
helper(address),
- sender(pn_sender(session, n.c_str())), capacity(1000), unreliable(helper.isUnreliable()) {}
+ sender(pn_sender(session, n.c_str())), capacity(50), unreliable(helper.isUnreliable()) {}
SenderContext::~SenderContext()
{
@@ -435,59 +437,63 @@ void SenderContext::Delivery::reset()
void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
{
- boost::shared_ptr<const EncodedMessage> original = msg.getEncoded();
+ try {
+ boost::shared_ptr<const EncodedMessage> original = msg.getEncoded();
- if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered
- //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received?
- if (original->hasHeaderChanged(msg)) {
- //since as yet have no annotations, just write the revised header then the rest of the message as received
- encoded.resize(16/*max header size*/ + original->getBareMessage().size);
- qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
+ if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered
+ //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received?
+ if (original->hasHeaderChanged(msg)) {
+ //since as yet have no annotations, just write the revised header then the rest of the message as received
+ encoded.resize(16/*max header size*/ + original->getBareMessage().size);
+ qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
+ HeaderAdapter header(msg);
+ encoder.writeHeader(header);
+ ::memcpy(encoded.getData() + encoder.getPosition(), original->getBareMessage().data, original->getBareMessage().size);
+ } else {
+ //since as yet have no annotations, if the header hasn't
+ //changed and we still have the original bare message, can
+ //send the entire content as is
+ encoded.resize(original->getSize());
+ ::memcpy(encoded.getData(), original->getData(), original->getSize());
+ }
+ } else {
HeaderAdapter header(msg);
+ PropertiesAdapter properties(msg, address.getSubject());
+ ApplicationPropertiesAdapter applicationProperties(msg.getHeaders());
+ //compute size:
+ size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header)
+ + qpid::amqp::MessageEncoder::getEncodedSize(properties)
+ + qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties);
+ if (msg.getContent().isVoid()) {
+ contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForContent(msg.getBytes());
+ } else {
+ contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForValue(msg.getContent()) + 3/*descriptor*/;
+ }
+ encoded.resize(contentSize);
+ QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes")
+ qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
+ //write header:
encoder.writeHeader(header);
- ::memcpy(encoded.getData() + encoder.getPosition(), original->getBareMessage().data, original->getBareMessage().size);
- } else {
- //since as yet have no annotations, if the header hasn't
- //changed and we still have the original bare message, can
- //send the entire content as is
- encoded.resize(original->getSize());
- ::memcpy(encoded.getData(), original->getData(), original->getSize());
- }
- } else {
- HeaderAdapter header(msg);
- PropertiesAdapter properties(msg, address.getSubject());
- ApplicationPropertiesAdapter applicationProperties(msg.getHeaders());
- //compute size:
- size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header)
- + qpid::amqp::MessageEncoder::getEncodedSize(properties)
- + qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties);
- if (msg.getContent().isVoid()) {
- contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForContent(msg.getBytes());
- } else {
- contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForValue(msg.getContent()) + 3/*descriptor*/;
- }
- encoded.resize(contentSize);
- QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes")
- qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
- //write header:
- encoder.writeHeader(header);
- //write delivery-annotations, write message-annotations (none yet supported)
- //write properties
- encoder.writeProperties(properties);
- //write application-properties
- encoder.writeApplicationProperties(applicationProperties);
- //write body
- if (!msg.getContent().isVoid()) {
- //write as AmqpValue
- encoder.writeValue(msg.getContent(), &qpid::amqp::message::AMQP_VALUE);
- } else if (msg.getBytes().size()) {
- encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported
- }
- if (encoder.getPosition() < encoded.getSize()) {
- QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition());
- encoded.trim(encoder.getPosition());
+ //write delivery-annotations, write message-annotations (none yet supported)
+ //write properties
+ encoder.writeProperties(properties);
+ //write application-properties
+ encoder.writeApplicationProperties(applicationProperties);
+ //write body
+ if (!msg.getContent().isVoid()) {
+ //write as AmqpValue
+ encoder.writeValue(msg.getContent(), &qpid::amqp::message::AMQP_VALUE);
+ } else if (msg.getBytes().size()) {
+ encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported
+ }
+ if (encoder.getPosition() < encoded.getSize()) {
+ QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition());
+ encoded.trim(encoder.getPosition());
+ }
+ //write footer (no annotations yet supported)
}
- //write footer (no annotations yet supported)
+ } catch (const qpid::Exception& e) {
+ throw SendError(e.what());
}
}
void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Mon Oct 21 22:04:51 2013
@@ -140,6 +140,23 @@ void SessionContext::acknowledge(const q
}
}
+void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject)
+{
+ DeliveryMap::iterator i = unacked.find(id);
+ if (i != unacked.end()) {
+ if (reject) {
+ QPID_LOG(debug, "rejecting message with id=" << id);
+ pn_delivery_update(i->second, PN_REJECTED);
+ } else {
+ QPID_LOG(debug, "releasing message with id=" << id);
+ pn_delivery_update(i->second, PN_MODIFIED);
+ pn_disposition_set_failed(pn_delivery_local(i->second), true);
+ }
+ pn_delivery_settle(i->second);
+ unacked.erase(i);
+ }
+}
+
bool SessionContext::settled()
{
bool result = true;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h Mon Oct 21 22:04:51 2013
@@ -79,6 +79,7 @@ class SessionContext
void acknowledge();
void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative);
void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end);
+ void nack(const qpid::framing::SequenceNumber& id, bool reject);
};
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp Mon Oct 21 22:04:51 2013
@@ -57,18 +57,17 @@ void SessionHandle::acknowledge(bool /*s
void SessionHandle::acknowledge(qpid::messaging::Message& msg, bool cumulative)
{
- //TODO: handle cumulative
connection->acknowledge(session, &msg, cumulative);
}
-void SessionHandle::reject(qpid::messaging::Message&)
+void SessionHandle::reject(qpid::messaging::Message& msg)
{
-
+ connection->nack(session, msg, true);
}
-void SessionHandle::release(qpid::messaging::Message&)
+void SessionHandle::release(qpid::messaging::Message& msg)
{
-
+ connection->nack(session, msg, false);
}
void SessionHandle::close()
Propchange: qpid/branches/linearstore/qpid/cpp/src/tests/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/tests:r1525057-1534385
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/MessageTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/MessageTest.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/MessageTest.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/MessageTest.cpp Mon Oct 21 22:04:51 2013
@@ -65,7 +65,7 @@ QPID_AUTO_TEST_CASE(testEncodeDecode)
msg = registry.decode(buffer);
BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());
- BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContentSize());
+ BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContent().size());
BOOST_CHECK_EQUAL(data, msg.getContent());
//BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
BOOST_CHECK_EQUAL(string("xyz"), msg.getPropertyAsString("abc"));
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/MessagingSessionTests.cpp Mon Oct 21 22:04:51 2013
@@ -397,6 +397,14 @@ QPID_AUTO_TEST_CASE(testBrowse)
receive(browser1, 10);
Receiver browser2 = fix.session.createReceiver(fix.queue + "; {mode:browse}");
receive(browser2, 10);
+ Receiver releaser1 = fix.session.createReceiver(fix.queue);
+ Message m1 = releaser1.fetch(messaging::Duration::SECOND*5);
+ BOOST_CHECK(!m1.getRedelivered());
+ fix.session.release(m1);
+ Receiver releaser2 = fix.session.createReceiver(fix.queue);
+ Message m2 = releaser2.fetch(messaging::Duration::SECOND*5);
+ BOOST_CHECK(m2.getRedelivered());
+ fix.session.release(m2);
Receiver consumer = fix.session.createReceiver(fix.queue);
receive(consumer, 10);
fix.session.acknowledge();
@@ -738,6 +746,7 @@ QPID_AUTO_TEST_CASE(testRelease)
Message m2 = receiver.fetch(Duration::SECOND * 1);
BOOST_CHECK_EQUAL(m1.getContent(), out.getContent());
BOOST_CHECK_EQUAL(m1.getContent(), m2.getContent());
+ BOOST_CHECK(m2.getRedelivered());
fix.session.acknowledge(true);
}
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/QueueFlowLimitTest.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/QueueFlowLimitTest.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/QueueFlowLimitTest.cpp Mon Oct 21 22:04:51 2013
@@ -77,8 +77,14 @@ public:
Message createMessage(uint32_t size)
{
static uint32_t seqNum;
- Message msg = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string (size, 'x'));
- msg.setSequence(++seqNum);
+ //Need to compute what data size is required to make a given
+ //overall size (use one byte of content in test message to ensure
+ //content frame is added)
+ Message test = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string("x"));
+ size_t min = test.getMessageSize() - 1;
+ if (min > size) throw qpid::Exception("Can't create message that small!");
+ Message msg = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string (size - min, 'x'));
+ msg.setSequence(++seqNum);//this doesn't affect message size
return msg;
}
}
@@ -100,18 +106,18 @@ QPID_AUTO_TEST_CASE(testFlowCount)
std::deque<Message> msgs;
for (size_t i = 0; i < 6; i++) {
- msgs.push_back(createMessage(10));
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
BOOST_CHECK(!flow->isFlowControlActive()); // 6 on queue
- msgs.push_back(createMessage(10));
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
BOOST_CHECK(!flow->isFlowControlActive()); // 7 on queue
- msgs.push_back(createMessage(10));
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue, ON
- msgs.push_back(createMessage(10));
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
BOOST_CHECK(flow->isFlowControlActive()); // 9 on queue, no change to flow control
@@ -136,69 +142,69 @@ QPID_AUTO_TEST_CASE(testFlowCount)
QPID_AUTO_TEST_CASE(testFlowSize)
{
FieldTable args;
- args.setUInt64(QueueFlowLimit::flowStopSizeKey, 70);
- args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 50);
+ args.setUInt64(QueueFlowLimit::flowStopSizeKey, 700);
+ args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 460);
std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args));
BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount());
BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount());
- BOOST_CHECK_EQUAL((uint32_t) 70, flow->getFlowStopSize());
- BOOST_CHECK_EQUAL((uint32_t) 50, flow->getFlowResumeSize());
+ BOOST_CHECK_EQUAL((uint32_t) 700, flow->getFlowStopSize());
+ BOOST_CHECK_EQUAL((uint32_t) 460, flow->getFlowResumeSize());
BOOST_CHECK(!flow->isFlowControlActive());
BOOST_CHECK(flow->monitorFlowControl());
std::deque<Message> msgs;
for (size_t i = 0; i < 6; i++) {
- msgs.push_back(createMessage(10));
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
- BOOST_CHECK(!flow->isFlowControlActive()); // 60 on queue
+ BOOST_CHECK(!flow->isFlowControlActive()); // 600 on queue
BOOST_CHECK_EQUAL(6u, flow->getFlowCount());
- BOOST_CHECK_EQUAL(60u, flow->getFlowSize());
+ BOOST_CHECK_EQUAL(600u, flow->getFlowSize());
- Message msg_9 = createMessage(9);
- flow->enqueued(msg_9);
- BOOST_CHECK(!flow->isFlowControlActive()); // 69 on queue
- Message tinyMsg_1 = createMessage(1);
+ Message msg_50 = createMessage(50);
+ flow->enqueued(msg_50);
+ BOOST_CHECK(!flow->isFlowControlActive()); // 650 on queue
+ Message tinyMsg_1 = createMessage(40);
flow->enqueued(tinyMsg_1);
- BOOST_CHECK(!flow->isFlowControlActive()); // 70 on queue
+ BOOST_CHECK(!flow->isFlowControlActive()); // 690 on queue
- Message tinyMsg_2 = createMessage(1);
+ Message tinyMsg_2 = createMessage(40);
flow->enqueued(tinyMsg_2);
- BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue, ON
- msgs.push_back(createMessage(10));
+ BOOST_CHECK(flow->isFlowControlActive()); // 730 on queue, ON
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
- BOOST_CHECK(flow->isFlowControlActive()); // 81 on queue
+ BOOST_CHECK(flow->isFlowControlActive()); // 830 on queue
BOOST_CHECK_EQUAL(10u, flow->getFlowCount());
- BOOST_CHECK_EQUAL(81u, flow->getFlowSize());
+ BOOST_CHECK_EQUAL(830u, flow->getFlowSize());
flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue
+ BOOST_CHECK(flow->isFlowControlActive()); // 730 on queue
flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(flow->isFlowControlActive()); // 61 on queue
+ BOOST_CHECK(flow->isFlowControlActive()); // 630 on queue
flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(flow->isFlowControlActive()); // 51 on queue
+ BOOST_CHECK(flow->isFlowControlActive()); // 530 on queue
flow->dequeued(tinyMsg_1);
- BOOST_CHECK(flow->isFlowControlActive()); // 50 on queue
+ BOOST_CHECK(flow->isFlowControlActive()); // 490 on queue
flow->dequeued(tinyMsg_2);
- BOOST_CHECK(!flow->isFlowControlActive()); // 49 on queue, OFF
+ BOOST_CHECK(!flow->isFlowControlActive()); // 450 on queue, OFF
- flow->dequeued(msg_9);
- BOOST_CHECK(!flow->isFlowControlActive()); // 40 on queue
+ flow->dequeued(msg_50);
+ BOOST_CHECK(!flow->isFlowControlActive()); // 400 on queue
flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(!flow->isFlowControlActive()); // 30 on queue
+ BOOST_CHECK(!flow->isFlowControlActive()); // 300 on queue
flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(!flow->isFlowControlActive()); // 20 on queue
+ BOOST_CHECK(!flow->isFlowControlActive()); // 200 on queue
BOOST_CHECK_EQUAL(2u, flow->getFlowCount());
- BOOST_CHECK_EQUAL(20u, flow->getFlowSize());
+ BOOST_CHECK_EQUAL(200u, flow->getFlowSize());
}
QPID_AUTO_TEST_CASE(testFlowArgs)
@@ -227,13 +233,13 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
FieldTable args;
args.setInt(QueueFlowLimit::flowStopCountKey, 10);
args.setInt(QueueFlowLimit::flowResumeCountKey, 5);
- args.setUInt64(QueueFlowLimit::flowStopSizeKey, 200);
- args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 100);
+ args.setUInt64(QueueFlowLimit::flowStopSizeKey, 2000);
+ args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 1000);
- std::deque<Message> msgs_1;
- std::deque<Message> msgs_10;
std::deque<Message> msgs_50;
std::deque<Message> msgs_100;
+ std::deque<Message> msgs_500;
+ std::deque<Message> msgs_1000;
Message msg;
@@ -243,104 +249,104 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
// verify flow control comes ON when only count passes its stop point.
for (size_t i = 0; i < 10; i++) {
- msgs_10.push_back(createMessage(10));
- flow->enqueued(msgs_10.back());
+ msgs_100.push_back(createMessage(100));
+ flow->enqueued(msgs_100.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
- // count:10 size:100
+ // count:10 size:1000
- msgs_1.push_back(createMessage(1));
- flow->enqueued(msgs_1.back()); // count:11 size: 101 ->ON
+ msgs_50.push_back(createMessage(50));
+ flow->enqueued(msgs_50.back()); // count:11 size: 1050 ->ON
BOOST_CHECK(flow->isFlowControlActive());
for (size_t i = 0; i < 6; i++) {
- flow->dequeued(msgs_10.front());
- msgs_10.pop_front();
+ flow->dequeued(msgs_100.front());
+ msgs_100.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
}
- // count:5 size: 41
+ // count:5 size: 450
- flow->dequeued(msgs_1.front()); // count: 4 size: 40 ->OFF
- msgs_1.pop_front();
+ flow->dequeued(msgs_50.front()); // count: 4 size: 400 ->OFF
+ msgs_50.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
for (size_t i = 0; i < 4; i++) {
- flow->dequeued(msgs_10.front());
- msgs_10.pop_front();
+ flow->dequeued(msgs_100.front());
+ msgs_100.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
}
// count:0 size:0
// verify flow control comes ON when only size passes its stop point.
- msgs_100.push_back(createMessage(100));
- flow->enqueued(msgs_100.back()); // count:1 size: 100
+ msgs_1000.push_back(createMessage(1000));
+ flow->enqueued(msgs_1000.back()); // count:1 size: 1000
BOOST_CHECK(!flow->isFlowControlActive());
- msgs_50.push_back(createMessage(50));
- flow->enqueued(msgs_50.back()); // count:2 size: 150
+ msgs_500.push_back(createMessage(500));
+ flow->enqueued(msgs_500.back()); // count:2 size: 1500
BOOST_CHECK(!flow->isFlowControlActive());
- msgs_50.push_back(createMessage(50));
- flow->enqueued(msgs_50.back()); // count:3 size: 200
+ msgs_500.push_back(createMessage(500));
+ flow->enqueued(msgs_500.back()); // count:3 size: 2000
BOOST_CHECK(!flow->isFlowControlActive());
- msgs_1.push_back(createMessage(1));
- flow->enqueued(msgs_1.back()); // count:4 size: 201 ->ON
+ msgs_50.push_back(createMessage(50));
+ flow->enqueued(msgs_50.back()); // count:4 size: 2050 ->ON
BOOST_CHECK(flow->isFlowControlActive());
- flow->dequeued(msgs_100.front()); // count:3 size:101
- msgs_100.pop_front();
+ flow->dequeued(msgs_1000.front()); // count:3 size:1050
+ msgs_1000.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
- flow->dequeued(msgs_1.front()); // count:2 size:100
- msgs_1.pop_front();
+ flow->dequeued(msgs_50.front()); // count:2 size:1000
+ msgs_50.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
- flow->dequeued(msgs_50.front()); // count:1 size:50 ->OFF
- msgs_50.pop_front();
+ flow->dequeued(msgs_500.front()); // count:1 size:500 ->OFF
+ msgs_500.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
// verify flow control remains ON until both thresholds drop below their
// resume point.
for (size_t i = 0; i < 8; i++) {
- msgs_10.push_back(createMessage(10));
- flow->enqueued(msgs_10.back());
+ msgs_100.push_back(createMessage(100));
+ flow->enqueued(msgs_100.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
- // count:9 size:130
+ // count:9 size:1300
- msgs_10.push_back(createMessage(10));
- flow->enqueued(msgs_10.back()); // count:10 size: 140
+ msgs_100.push_back(createMessage(100));
+ flow->enqueued(msgs_100.back()); // count:10 size: 1400
BOOST_CHECK(!flow->isFlowControlActive());
- msgs_1.push_back(createMessage(1));
- flow->enqueued(msgs_1.back()); // count:11 size: 141 ->ON
+ msgs_50.push_back(createMessage(50));
+ flow->enqueued(msgs_50.back()); // count:11 size: 1450 ->ON
BOOST_CHECK(flow->isFlowControlActive());
- msgs_100.push_back(createMessage(100));
- flow->enqueued(msgs_100.back()); // count:12 size: 241 (both thresholds crossed)
+ msgs_1000.push_back(createMessage(1000));
+ flow->enqueued(msgs_1000.back()); // count:12 size: 2450 (both thresholds crossed)
BOOST_CHECK(flow->isFlowControlActive());
- // at this point: 9@10 + 1@50 + 1@100 + 1@1 == 12@241
+ // at this point: 9@100 + 1@500 + 1@1000 + 1@50 == 12@2450
- flow->dequeued(msgs_50.front()); // count:11 size:191
- msgs_50.pop_front();
+ flow->dequeued(msgs_500.front()); // count:11 size:1950
+ msgs_500.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
for (size_t i = 0; i < 9; i++) {
- flow->dequeued(msgs_10.front());
- msgs_10.pop_front();
+ flow->dequeued(msgs_100.front());
+ msgs_100.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
}
- // count:2 size:101
- flow->dequeued(msgs_1.front()); // count:1 size:100
- msgs_1.pop_front();
+ // count:2 size:1050
+ flow->dequeued(msgs_50.front()); // count:1 size:1000
+ msgs_50.pop_front();
BOOST_CHECK(flow->isFlowControlActive()); // still active due to size
- flow->dequeued(msgs_100.front()); // count:0 size:0 ->OFF
- msgs_100.pop_front();
+ flow->dequeued(msgs_1000.front()); // count:0 size:0 ->OFF
+ msgs_1000.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
}
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/QueueOptionsTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/QueueOptionsTest.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/QueueOptionsTest.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/QueueOptionsTest.cpp Mon Oct 21 22:04:51 2013
@@ -63,16 +63,9 @@ QPID_AUTO_TEST_CASE(testFlags)
{
QueueOptions ft;
- ft.setPersistLastNode();
ft.setOrdering(LVQ);
-
- BOOST_CHECK(1 == ft.getAsInt(QueueOptions::strPersistLastNode));
BOOST_CHECK(1 == ft.getAsInt(QueueOptions::strLastValueQueue));
-
- ft.clearPersistLastNode();
ft.setOrdering(FIFO);
-
- BOOST_CHECK(!ft.isSet(QueueOptions::strPersistLastNode));
BOOST_CHECK(!ft.isSet(QueueOptions::strLastValueQueue));
}
@@ -87,16 +80,6 @@ QPID_AUTO_TEST_CASE(testSetOrdering)
}
-QPID_AUTO_TEST_CASE(testClearPersistLastNode)
-{
- //ensure clear works even if not preceded by the setting on the
- //option
- QueueOptions ft;
- ft.clearPersistLastNode();
- BOOST_CHECK(!ft.isSet(QueueOptions::strPersistLastNode));
-}
-
-
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/QueuePolicyTest.cpp Mon Oct 21 22:04:51 2013
@@ -69,9 +69,15 @@ QPID_AUTO_TEST_CASE(testRingPolicyCount)
QPID_AUTO_TEST_CASE(testRingPolicySize)
{
- std::string hundredBytes = std::string(100, 'h');
- std::string fourHundredBytes = std::string (400, 'f');
- std::string thousandBytes = std::string(1000, 't');
+ //The message size now includes all headers as well as the content
+ //aka body, so compute the amount of data needed to hit a given
+ //overall size
+ std::string q("my-ring-queue");
+ size_t minMessageSize = 25/*minimum size of headers*/ + q.size()/*routing key length*/ + 4/*default exchange, added by broker*/;
+
+ std::string hundredBytes = std::string(100 - minMessageSize, 'h');
+ std::string fourHundredBytes = std::string (400 - minMessageSize, 'f');
+ std::string thousandBytes = std::string(1000 - minMessageSize, 't');
// Ring queue, 500 bytes maxSize
@@ -79,7 +85,6 @@ QPID_AUTO_TEST_CASE(testRingPolicySize)
args.setSizePolicy(RING, 500, 0);
SessionFixture f;
- std::string q("my-ring-queue");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
// A. Send messages 0 .. 5, each 100 bytes
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/brokertest.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/brokertest.py Mon Oct 21 22:04:51 2013
@@ -424,7 +424,6 @@ class BrokerTest(TestCase):
ha_lib = os.getenv("HA_LIB")
xml_lib = os.getenv("XML_LIB")
amqp_lib = os.getenv("AMQP_LIB")
- amqpc_lib = os.getenv("AMQPC_LIB")
qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
qpid_route_exec = os.getenv("QPID_ROUTE_EXEC")
receiver_exec = os.getenv("RECEIVER_EXEC")
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/interlink_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/interlink_tests.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/interlink_tests.py (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/interlink_tests.py Mon Oct 21 22:04:51 2013
@@ -46,7 +46,6 @@ class AmqpBrokerTest(BrokerTest):
"""
def setUp(self):
BrokerTest.setUp(self)
- os.putenv("QPID_LOAD_MODULE", BrokerTest.amqpc_lib)
self.port_holder = HaPort(self)
self.broker = self.amqp_broker(port_holder=self.port_holder)
self.default_config = Config(self.broker)
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/CMakeLists.txt?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/CMakeLists.txt (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/CMakeLists.txt Mon Oct 21 22:04:51 2013
@@ -70,48 +70,119 @@ set (qpid_test_boost_libs
# womp on each other's store directory.
#
-# define_selftest
+# define_legacystore_test
# macro to accept the name of a single source file and to create a
# unit test executable that runs the source.
#
-MACRO (define_selftest theSourceFile)
+MACRO (define_legacystore_test theSourceFile)
add_executable (legacystore_${theSourceFile}
- unit_test
${theSourceFile}
+ unit_test
${platform_test_additions})
target_link_libraries (legacystore_${theSourceFile}
${qpid_test_boost_libs}
- qpidmessaging qpidbroker qmfconsole legacystore)
-get_property(ls_include TARGET legacystore_${theSourceFile} PROPERTY INCLUDE_DIRECTORIES)
-list(APPEND ls_include ${abs_top_srcdir}/src/qpid/legacystore)
-list(APPEND ls_include ${abs_top_srcdir}/src/tests)
-set_target_properties (legacystore_${theSourceFile} PROPERTIES
- INCLUDE_DIRECTORIES "${ls_include}"
- COMPILE_DEFINITIONS _IN_QPID_BROKER)
+ qpidmessaging qpidtypes qpidbroker qpidcommon legacystore_shared)
+set_target_properties (legacystore_${theSourceFile} PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
remember_location(legacystore_${theSourceFile})
-set(test_wrap ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_test${test_script_suffix})
add_test (legacystore_${theSourceFile} ${test_wrap} ${legacystore_${theSourceFile}_LOCATION})
-ENDMACRO (define_selftest)
+ENDMACRO (define_legacystore_test)
-# add_definitions(-H)
+define_legacystore_test (SimpleTest)
+define_legacystore_test (OrderingTest)
+define_legacystore_test (TransactionalTest)
+define_legacystore_test (TwoPhaseCommitTest)
+
+# Journal tests
+MACRO (define_journal_test mainSourceFile)
+if ("${ARGV1}" STREQUAL "LONG")
+ set (testname "journal_long_${mainSourceFile}")
+else ()
+ set (testname "journal_${mainSourceFile}")
+endif ()
+add_executable (${testname}
+ jrnl/${mainSourceFile}
+ unit_test
+ ${platform_test_additions})
+target_link_libraries (${testname}
+ ${qpid_test_boost_libs}
+ ${clock_gettime_LIB} legacystore_shared)
+if ("${ARGV1}" STREQUAL "LONG")
+ set_target_properties(${testname} PROPERTIES COMPILE_DEFINITIONS LONG_TEST)
+endif ()
+remember_location(${testname})
+add_test (${testname} ${test_wrap} ${${testname}_LOCATION})
+unset (testname)
+ENDMACRO (define_journal_test)
+
+define_journal_test (_ut_time_ns)
+define_journal_test (_ut_jexception)
+define_journal_test (_ut_jerrno)
+define_journal_test (_ut_rec_hdr)
+define_journal_test (_ut_jinf)
+define_journal_test (_ut_jdir)
+define_journal_test (_ut_enq_map)
+define_journal_test (_ut_txn_map)
+define_journal_test (_ut_lpmgr)
+define_journal_test (_st_basic)
+define_journal_test (_st_basic_txn)
+define_journal_test (_st_read)
+define_journal_test (_st_read_txn)
+define_journal_test (_st_auto_expand)
+define_journal_test (_ut_lpmgr LONG)
+define_journal_test (_st_basic LONG)
+define_journal_test (_st_read LONG)
+
+add_executable(jtt
+ jrnl/jtt/args.cpp
+ jrnl/jtt/data_src.cpp
+ jrnl/jtt/jrnl_init_params.cpp
+ jrnl/jtt/jrnl_instance.cpp
+ jrnl/jtt/main.cpp
+ jrnl/jtt/read_arg.cpp
+ jrnl/jtt/test_case.cpp
+ jrnl/jtt/test_case_result.cpp
+ jrnl/jtt/test_case_result_agregation.cpp
+ jrnl/jtt/test_case_set.cpp
+ jrnl/jtt/test_mgr.cpp)
+
+target_link_libraries (jtt
+ ${Boost_PROGRAM_OPTIONS_LIBRARY}
+ ${clock_gettime_LIB} legacystore_shared)
+
+add_test(journal_jtt ${CMAKE_CURRENT_BINARY_DIR}/jtt -c ${CMAKE_CURRENT_SOURCE_DIR}/jrnl/jtt/jtt.csv)
+
+add_executable (jtt__ut
+ jrnl/jtt/_ut_data_src.cpp
+ jrnl/jtt/_ut_jrnl_init_params.cpp
+ jrnl/jtt/_ut_read_arg.cpp
+ jrnl/jtt/_ut_jrnl_instance.cpp
+ jrnl/jtt/_ut_test_case.cpp
+ jrnl/jtt/_ut_test_case_result.cpp
+ jrnl/jtt/_ut_test_case_result_agregation.cpp
+ jrnl/jtt/_ut_test_case_set.cpp
+ jrnl/jtt/args.cpp
+ jrnl/jtt/data_src.cpp
+ jrnl/jtt/jrnl_init_params.cpp
+ jrnl/jtt/jrnl_instance.cpp
+ jrnl/jtt/read_arg.cpp
+ jrnl/jtt/test_case.cpp
+ jrnl/jtt/test_case_set.cpp
+ jrnl/jtt/test_case_result.cpp
+ jrnl/jtt/test_case_result_agregation.cpp
+ unit_test.cpp)
+
+target_link_libraries (jtt__ut
+ ${qpid_test_boost_libs}
+ ${Boost_PROGRAM_OPTIONS_LIBRARY}
+ ${clock_gettime_LIB} legacystore_shared)
-define_selftest (SimpleTest)
-define_selftest (OrderingTest)
-define_selftest (TransactionalTest)
-define_selftest (TwoPhaseCommitTest)
+add_test(NAME journal_jtt_ut WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/jrnl/jtt COMMAND ${CMAKE_CURRENT_BINARY_DIR}/jtt__ut)
#
# Other test programs
#
-# This should ideally be done as part of the test run, but I don't know a way
-# to get these arguments and the working directory set like Makefile.am does,
-# and have that run during the test pass.
-if (PYTHON_EXECUTABLE)
- set (python_bld ${CMAKE_CURRENT_BINARY_DIR}/python)
- execute_process(COMMAND ${PYTHON_EXECUTABLE} setup.py install --prefix=${pythoon_bld} --install-lib=${python_bld} --install-scripts=${python_bld}/commands
- WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/../python)
-endif (PYTHON_EXECUTABLE)
+add_test (legacystore_python_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_python_tests${test_script_suffix})
endif (BUILD_LEGACYSTORE)
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/OrderingTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/OrderingTest.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/OrderingTest.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/OrderingTest.cpp Mon Oct 21 22:04:51 2013
@@ -20,16 +20,18 @@
*/
#include "unit_test.h"
+#include "MessageUtils.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/broker/PersistableObject.h"
+#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/legacystore/MessageStoreImpl.h"
-#include <iostream>
-#include "MessageUtils.h"
-#include <qpid/broker/Queue.h>
-#include <qpid/broker/RecoveryManagerImpl.h>
-#include <qpid/framing/AMQHeaderBody.h>
#include "qpid/log/Logger.h"
#include "qpid/sys/Timer.h"
+#include <iostream>
+
using namespace qpid;
using namespace qpid::broker;
using namespace qpid::framing;
@@ -48,7 +50,7 @@ QPID_AUTO_TEST_SUITE(OrderingTest)
const std::string test_filename("OrderingTest");
const char* tdp = getenv("TMP_DATA_DIR");
-const std::string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/OrderingTest");
+const std::string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/var/tmp/OrderingTest");
// === Helper fns ===
@@ -118,7 +120,8 @@ void restart()
sys::Timer t;
DtxManager mgr(t);
mgr.setStore (store.get());
- RecoveryManagerImpl recoveryMgr(queues, exchanges, links, mgr, br.getProtocolRegistry());
+ RecoveredObjects ro;
+ RecoveryManagerImpl recoveryMgr(queues, exchanges, links, mgr, br.getProtocolRegistry(), ro);
store->recover(recoveryMgr);
queue = queues.find(name);
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/SimpleTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/SimpleTest.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/SimpleTest.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/SimpleTest.cpp Mon Oct 21 22:04:51 2013
@@ -20,21 +20,23 @@
*/
#include "unit_test.h"
+#include "MessageUtils.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueSettings.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/broker/PersistableObject.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/legacystore/MessageStoreImpl.h"
-#include <iostream>
-#include "tests/legacystore/MessageUtils.h"
#include "qpid/legacystore/StoreException.h"
-#include "qpid/broker/DirectExchange.h"
-#include <qpid/broker/Queue.h>
-#include <qpid/broker/QueueSettings.h>
-#include <qpid/broker/RecoveryManagerImpl.h>
-#include <qpid/framing/AMQHeaderBody.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
#include "qpid/log/Logger.h"
#include "qpid/sys/Timer.h"
+#include <iostream>
+
qpid::broker::Broker::Options opts;
qpid::broker::Broker br(opts);
@@ -57,15 +59,15 @@ QPID_AUTO_TEST_SUITE(SimpleTest)
const string test_filename("SimpleTest");
const char* tdp = getenv("TMP_DATA_DIR");
-const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/SimpleTest");
+const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/var/tmp/SimpleTest");
// === Helper fns ===
-struct DummyHandler : OutputHandler
+struct DummyHandler : FrameHandler
{
std::vector<AMQFrame> frames;
- virtual void send(AMQFrame& frame){
+ virtual void handle(AMQFrame& frame){
frames.push_back(frame);
}
};
@@ -75,7 +77,8 @@ void recover(MessageStoreImpl& store, Qu
sys::Timer t;
DtxManager mgr(t);
mgr.setStore (&store);
- RecoveryManagerImpl recovery(queues, exchanges, links, mgr, br.getProtocolRegistry());
+ RecoveredObjects ro;
+ RecoveryManagerImpl recovery(queues, exchanges, links, mgr, br.getProtocolRegistry(), ro);
store.recover(recovery);
}
@@ -232,8 +235,8 @@ QPID_AUTO_TEST_CASE(QueueCreateWithSetti
recover(store, registry);
Queue::shared_ptr queue = registry.find(name);
BOOST_REQUIRE(queue);
- BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), 202);
- BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), 1003);
+ BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), 202u);
+ BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), 1003u);
BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), queue->getSettings().maxDepth.getCount());
BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), queue->getSettings().maxDepth.getSize());
}
@@ -307,7 +310,7 @@ QPID_AUTO_TEST_CASE(Enqueue)
BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());
BOOST_CHECK_EQUAL(messageId, MessageUtils::getMessageId(msg));
BOOST_CHECK_EQUAL(std::string("xyz"), msg.getAnnotation("abc"));
- BOOST_CHECK_EQUAL((u_int64_t) 14, msg.getContentSize());
+ BOOST_CHECK_EQUAL((u_int64_t) 14, msg.getContent().size());
DummyHandler handler;
MessageUtils::deliver(msg, handler, 100);
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/TransactionalTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/TransactionalTest.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/TransactionalTest.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/TransactionalTest.cpp Mon Oct 21 22:04:51 2013
@@ -20,18 +20,20 @@
*/
#include "unit_test.h"
-
-#include "qpid/legacystore/MessageStoreImpl.h"
-#include <iostream>
#include "MessageUtils.h"
-#include "qpid/legacystore/StoreException.h"
+
#include "qpid/broker/Queue.h"
#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/broker/PersistableObject.h"
#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/legacystore/MessageStoreImpl.h"
+#include "qpid/legacystore/StoreException.h"
#include "qpid/log/Statement.h"
#include "qpid/log/Logger.h"
#include "qpid/sys/Timer.h"
+#include <iostream>
+
using namespace mrg::msgstore;
using namespace qpid;
using namespace qpid::broker;
@@ -53,7 +55,7 @@ QPID_AUTO_TEST_SUITE(TransactionalTest)
const string test_filename("TransactionalTest");
const char* tdp = getenv("TMP_DATA_DIR");
-const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/TransactionalTest");
+const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/var/tmp/TransactionalTest");
// Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning
class TestTxnCtxt : public TxnCtxt
@@ -141,7 +143,8 @@ void restart()
sys::Timer t;
DtxManager mgr(t);
mgr.setStore (store.get());
- RecoveryManagerImpl recovery(*queues, exchanges, links, mgr, br.getProtocolRegistry());
+ RecoveredObjects ro;
+ RecoveryManagerImpl recovery(*queues, exchanges, links, mgr, br.getProtocolRegistry(), ro);
store->recover(recovery);
queueA = queues->find(nameA);
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp Mon Oct 21 22:04:51 2013
@@ -20,18 +20,20 @@
*/
#include "unit_test.h"
-
-#include "qpid/legacystore/MessageStoreImpl.h"
-#include <iostream>
#include "MessageUtils.h"
+
#include "qpid/broker/Queue.h"
#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/broker/PersistableObject.h"
#include "qpid/framing/AMQHeaderBody.h"
-#include "qpid/log/Statement.h"
+#include "qpid/legacystore/MessageStoreImpl.h"
#include "qpid/legacystore/TxnCtxt.h"
#include "qpid/log/Logger.h"
+#include "qpid/log/Statement.h"
#include "qpid/sys/Timer.h"
+#include <iostream>
+
using namespace mrg::msgstore;
using namespace qpid;
using namespace qpid::broker;
@@ -54,7 +56,7 @@ QPID_AUTO_TEST_SUITE(TwoPhaseCommitTest)
const string test_filename("TwoPhaseCommitTest");
const char* tdp = getenv("TMP_DATA_DIR");
-string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/TwoPhaseCommitTest");
+string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/var/tmp/TwoPhaseCommitTest");
// === Helper fns ===
@@ -386,7 +388,8 @@ class TwoPhaseCommitTest
links = std::auto_ptr<LinkRegistry>(new LinkRegistry);
dtxmgr = std::auto_ptr<DtxManager>(new DtxManager(t));
dtxmgr->setStore (store.get());
- RecoveryManagerImpl recovery(*queues, exchanges, *links, *dtxmgr, br.getProtocolRegistry());
+ RecoveredObjects ro;
+ RecoveryManagerImpl recovery(*queues, exchanges, *links, *dtxmgr, br.getProtocolRegistry(), ro);
store->recover(recovery);
queueA = queues->find(nameA);
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/run_python_tests
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/run_python_tests?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/run_python_tests (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/run_python_tests Mon Oct 21 22:04:51 2013
@@ -18,47 +18,25 @@
# under the License.
#
-if test -z ${QPID_DIR} ; then
- cat <<EOF
+source ../test_env.sh
- =========== WARNING: PYTHON TESTS DISABLED ==============
-
- QPID_DIR not set.
-
- ===========================================================
-
-EOF
- exit
-fi
-
-. `dirname $0`/tests_env.sh
+#Add our directory to the python path
+export PYTHONPATH=$srcdir/legacystore:$PYTHONPATH
MODULENAME=python_tests
echo "Running Python tests in module ${MODULENAME}..."
-case x$1 in
- xSHORT_TEST)
- DEFAULT_PYTHON_TESTS="*.client_persistence.ExchangeQueueTests.* *.flow_to_disk.SimpleMaxSizeCountTest.test_browse_recover *.flow_to_disk.SimpleMaxSizeCountTest.test_durable_browse_recover *.flow_to_disk.MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest.test_mixed_limit_2" ;;
- xLONG_TEST)
- DEFAULT_PYTHON_TESTS= ;;
- x)
- DEFAULT_PYTHON_TESTS="*.client_persistence.* *.flow_to_disk.SimpleMaxSizeCountTest.* *.flow_to_disk.MultiDurableQueue*.test_mixed_limit_1 *.flow_to_disk.MultiQueue*.test_mixed_limit_1 *.resize.SimpleTest.* *.federation.*" ;;
- *)
- DEFAULT_PYTHON_TESTS=$1
-esac
-
-PYTHON_TESTS=${PYTHON_TESTS:-${DEFAULT_PYTHON_TESTS}}
+test -d $PYTHON_DIR || { echo "Skipping python tests, no python dir."; exit 0; }
+QPID_PORT=${QPID_PORT:-5672}
+FAILING=${FAILING:-/dev/null}
+PYTHON_TESTS=${PYTHON_TESTS:-$*}
OUTDIR=${MODULENAME}.tmp
rm -rf $OUTDIR
# To debug a test, add the following options to the end of the following line:
# -v DEBUG -c qpid.messaging.io.ops [*.testName]
-${PYTHON_DIR}/qpid-python-test -m ${MODULENAME} -I ${FAILING_PYTHON_TESTS} ${PYTHON_TESTS} -DOUTDIR=$OUTDIR #-v DEBUG
-RETCODE=$?
+${QPID_PYTHON_TEST} -m ${MODULENAME} -I $FAILING -DOUTDIR=$OUTDIR \
+ $PYTHON_TEST || exit 1
-if test x${RETCODE} != x0; then
- exit 1;
-fi
-exit 0
Propchange: qpid/branches/linearstore/qpid/cpp/src/tests/legacystore/run_python_tests
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/ping_broker
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/ping_broker?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/ping_broker (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/ping_broker Mon Oct 21 22:04:51 2013
@@ -57,7 +57,7 @@ def OptionsAndArguments(argv):
parser.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>",
help="Maximum time to wait for broker connection (in seconds)")
parser.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>",
- help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
parser.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
parser.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
parser.add_option("--ssl-trustfile", action="store", type="string", metavar="<CA>", help="List of trusted CAs (PEM Format)")
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/qpid-cluster-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/qpid-cluster-benchmark?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/qpid-cluster-benchmark (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/qpid-cluster-benchmark Mon Oct 21 22:04:51 2013
@@ -1,5 +1,5 @@
#!/bin/sh
-echo#
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/swig_python_tests
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/swig_python_tests?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/swig_python_tests (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/swig_python_tests Mon Oct 21 22:04:51 2013
@@ -50,10 +50,10 @@ start_broker
echo "Running swigged python tests using broker on port $QPID_PORT"
export PYTHONPATH=$PYTHONPATH:$PYTHONPATH_SWIG
+export QPID_USE_SWIG_CLIENT=1
$QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests || FAILED=1
-if [[ -a $AMQPC_LIB ]] ; then
- export QPID_LOAD_MODULE=$AMQPC_LIB
- $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1
+if [[ -a $AMQP_LIB ]] ; then
+ $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1
fi
stop_broker
if [[ $FAILED -eq 1 ]]; then
Modified: qpid/branches/linearstore/qpid/cpp/src/tests/test_env.sh.in
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/tests/test_env.sh.in?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/tests/test_env.sh.in (original)
+++ qpid/branches/linearstore/qpid/cpp/src/tests/test_env.sh.in Mon Oct 21 22:04:51 2013
@@ -67,7 +67,6 @@ exportmodule HA_LIB ha.so
exportmodule XML_LIB xml.so
exportmodule STORE_LIB legacystore.so
exportmodule AMQP_LIB amqp.so
-exportmodule AMQPC_LIB amqpc.so
# Qpid options
export QPID_NO_MODULE_DIR=1 # Don't accidentally load installed modules
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org