You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/04/07 23:22:56 UTC
svn commit: r1585588 [1/2] - in /qpid/trunk/qpid: cpp/bindings/qpid/python/
cpp/include/qpid/ cpp/include/qpid/messaging/ cpp/src/ cpp/src/qpid/broker/
cpp/src/qpid/broker/amqp/ cpp/src/qpid/ha/ cpp/src/qpid/messaging/
cpp/src/qpid/messaging/amqp/ cpp/...
Author: aconway
Date: Mon Apr 7 21:22:55 2014
New Revision: 1585588
URL: http://svn.apache.org/r1585588
Log:
QPID-5560: HA tests do not use AMQP 1.0
The HA tests were using only AMQP 0-10.
Modified the tests to use AMQP 1.0 if available (still use 0-10 if 1.0 is not available)
Fixed bugs uncovered both in the tests and in the AMQP 1.0 implementation.
Summary of changes:
- brokertest.py: configurable support for of swig vs. native and amqp0-10 vs. 1.0
- default to swig+amqp1.0 if swig is available, native+amqp0-10 otherwise
- qpidtoollibs/broker.py: enable use of swig client with BrokerAgent
- Swig python client:
- support for passing client_properties/properties.
- expose AddressHelper pn_data read/write as PnData helper class
- set sender/receiver capacity on creation
- limited disposition support - rejected messages.
- support for additional timeout parameters
- expose messaging::Logger, allow log configuration to be set from python.
- ha_tests.py:
- bind, delete policies not supported by AMQP 1.0, switched to using BrokerAgent QMF.
- pass protocol:amqp1.0 connection-option to c++ test clients (qpid-send, qpid-receive)
- TX tests forsce use of 0-10 protocol (but still with Swig client if enabled.)
- Broker fixes:
- Queue::Settings::isTemporary was set in the 0-10 SessionAdapter, moved to Broker::createQueue.
- broker::amqp::Session was always setting an exclusive owner in createQueue
Added:
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp (with props)
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.h (with props)
Modified:
qpid/trunk/qpid/cpp/bindings/qpid/python/qpid_messaging.i
qpid/trunk/qpid/cpp/include/qpid/messaging/Logger.h
qpid/trunk/qpid/cpp/include/qpid/qpid.i
qpid/trunk/qpid/cpp/include/qpid/swig_python_typemaps.i
qpid/trunk/qpid/cpp/src/amqp.cmake
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
qpid/trunk/qpid/cpp/src/tests/brokertest.py
qpid/trunk/qpid/cpp/src/tests/ha_test.py
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
qpid/trunk/qpid/cpp/src/tests/interlink_tests.py
qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py
qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/resize.py
qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/store_test.py
qpid/trunk/qpid/cpp/src/tests/qpidd_qmfv2_tests.py
qpid/trunk/qpid/cpp/src/tests/test_env.sh.in
qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py
Modified: qpid/trunk/qpid/cpp/bindings/qpid/python/qpid_messaging.i
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qpid/python/qpid_messaging.i?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qpid/python/qpid_messaging.i (original)
+++ qpid/trunk/qpid/cpp/bindings/qpid/python/qpid_messaging.i Mon Apr 7 21:22:55 2014
@@ -141,7 +141,7 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi
* don't even know why there is a non-const version of the method. */
%rename(opened) qpid::messaging::Connection::isOpen();
%rename(_close) qpid::messaging::Connection::close();
-%rename(receiver) qpid::messaging::Session::createReceiver;
+%rename(_receiver) qpid::messaging::Session::createReceiver;
%rename(_sender) qpid::messaging::Session::createSender;
%rename(_acknowledge_all) qpid::messaging::Session::acknowledge(bool);
%rename(_acknowledge_msg) qpid::messaging::Session::acknowledge(
@@ -161,6 +161,16 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi
%rename(_getTtl) qpid::messaging::Message::getTtl;
%rename(_setTtl) qpid::messaging::Message::setTtl;
+%rename(_sync) qpid::messaging::Session::sync;
+
+// Capitalize constant names correctly for python
+%rename(TRACE) qpid::messaging::trace;
+%rename(DEBUG) qpid::messaging::debug;
+%rename(INFO) qpid::messaging::info;
+%rename(NOTICE) qpid::messaging::notice;
+%rename(WARNING) qpid::messaging::warning;
+%rename(ERROR) qpid::messaging::error;
+%rename(CRITICAL) qpid::messaging::critical;
%include "qpid/qpid.i"
@@ -221,31 +231,77 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi
%pythoncode %{
@staticmethod
- def establish(url=None, **options) :
+ def establish(url=None, timeout=None, **options) :
+ if timeout and "reconnect-timeout" not in options:
+ options["reconnect-timeout"] = timeout
conn = Connection(url, **options)
conn.open()
return conn
%}
}
+%pythoncode %{
+ # Disposition class from messaging/message.py
+ class Disposition:
+ def __init__(self, type, **options):
+ self.type = type
+ self.options = options
+
+ def __repr__(self):
+ args = [str(self.type)] + ["%s=%r" % (k, v) for k, v in self.options.items()]
+ return "Disposition(%s)" % ", ".join(args)
+
+ # Consntants from messaging/constants.py
+ __SELF__ = object()
+
+ class Constant:
+
+ def __init__(self, name, value=__SELF__):
+ self.name = name
+ if value is __SELF__:
+ self.value = self
+ else:
+ self.value = value
+
+ def __repr__(self):
+ return self.name
+
+ AMQP_PORT = 5672
+ AMQPS_PORT = 5671
+
+ UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
+
+ REJECTED = Constant("REJECTED")
+ RELEASED = Constant("RELEASED")
+%}
+
%extend qpid::messaging::Session {
%pythoncode %{
def acknowledge(self, message=None, disposition=None, sync=True) :
- if disposition :
- raise Exception("SWIG does not support dispositions yet. Use "
- "Session.reject and Session.release instead")
if message :
- self._acknowledge_msg(message, sync)
+ if disposition is None: self._acknowledge_msg(message, sync)
+ # FIXME aconway 2014-02-11: the following does not repsect the sync flag.
+ elif disposition.type == REJECTED: self.reject(message)
+ elif disposition.type == RELEASED: self.release(message)
else :
+ if disposition : # FIXME aconway 2014-02-11: support this
+ raise Exception("SWIG does not support dispositions yet. Use "
+ "Session.reject and Session.release instead")
self._acknowledge_all(sync)
__swig_getmethods__["connection"] = getConnection
if _newclass: connection = property(getConnection)
- def sender(self, target, **options) :
- s = self._sender(target)
- s._setDurable(options.get("durable"))
- return s
+ def receiver(self, source, capacity=None):
+ r = self._receiver(source)
+ if capacity is not None: r.capacity = capacity
+ return r
+
+ def sender(self, target, durable=None, capacity=None) :
+ s = self._sender(target)
+ if capacity is not None: s.capacity = capacity
+ s._setDurable(durable)
+ return s
def next_receiver(self, timeout=None) :
if timeout is None :
@@ -254,6 +310,11 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi
# Python API uses timeouts in seconds,
# but C++ API uses milliseconds
return self._next_receiver(Duration(int(1000*timeout)))
+
+ def sync(self, timeout=None):
+ if timeout == 0: self._sync(False) # Non-blocking sync
+ else: self._sync(True) # Blocking sync, C++ has not timeout.
+
%}
}
@@ -302,6 +363,8 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi
message.durable = self.durable
return self._send(message, sync)
+ def sync(self, timeout=None): self.session.sync(timeout)
+
__swig_getmethods__["capacity"] = getCapacity
__swig_setmethods__["capacity"] = setCapacity
if _newclass: capacity = property(getCapacity, setCapacity)
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Logger.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Logger.h?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Logger.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Logger.h Mon Apr 7 21:22:55 2014
@@ -103,7 +103,7 @@ public:
* --log-time ("on"|"off|"0"|"1")
* --log-level ("on"|"off|"0"|"1")
* --log-source ("on"|"off|"0"|"1")
- * --log-thread ("on"|"off|"0"|"1")
+ * --log-thread ("on"|"off|"0"|"1")
* --log-function ("on"|"off|"0"|"1")
* --log-hires-timestamp ("on"|"off|"0"|"1")
*
Modified: qpid/trunk/qpid/cpp/include/qpid/qpid.i
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/qpid.i?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/qpid.i (original)
+++ qpid/trunk/qpid/cpp/include/qpid/qpid.i Mon Apr 7 21:22:55 2014
@@ -53,6 +53,7 @@ struct mystr
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Duration.h>
#include <qpid/messaging/FailoverUpdates.h>
+#include <qpid/messaging/Logger.h>
//
// Wrapper functions for map-decode and list-decode. This allows us to avoid
@@ -84,6 +85,7 @@ qpid::types::Variant::List& decodeList(c
%include <qpid/messaging/Session.h>
%include <qpid/messaging/Connection.h>
%include <qpid/messaging/FailoverUpdates.h>
+%include <qpid/messaging/Logger.h>
qpid::types::Variant::Map& decodeMap(const qpid::messaging::Message&);
qpid::types::Variant::List& decodeList(const qpid::messaging::Message&);
Modified: qpid/trunk/qpid/cpp/include/qpid/swig_python_typemaps.i
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/swig_python_typemaps.i?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/swig_python_typemaps.i (original)
+++ qpid/trunk/qpid/cpp/include/qpid/swig_python_typemaps.i Mon Apr 7 21:22:55 2014
@@ -452,3 +452,10 @@ typedef int Py_ssize_t;
$1 = PyInt_Check($input) ? 1 : 0;
}
+
+/**
+ * argc,argv as python list
+ */
+
+%include <argcargv.i>
+%apply (int ARGC, char **ARGV) { (int argc, const char *argv[]) }
Modified: qpid/trunk/qpid/cpp/src/amqp.cmake
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/amqp.cmake?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/amqp.cmake (original)
+++ qpid/trunk/qpid/cpp/src/amqp.cmake Mon Apr 7 21:22:55 2014
@@ -120,6 +120,8 @@ if (BUILD_AMQP)
qpid/messaging/amqp/ConnectionHandle.cpp
qpid/messaging/amqp/DriverImpl.h
qpid/messaging/amqp/DriverImpl.cpp
+ qpid/messaging/amqp/PnData.h
+ qpid/messaging/amqp/PnData.cpp
qpid/messaging/amqp/ReceiverContext.h
qpid/messaging/amqp/ReceiverContext.cpp
qpid/messaging/amqp/ReceiverHandle.h
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Apr 7 21:22:55 2014
@@ -1292,12 +1292,13 @@ const std::string Broker::TCP_TRANSPORT(
std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
const std::string& name,
- const QueueSettings& settings,
+ const QueueSettings& constSettings,
const OwnershipToken* owner,
const std::string& alternateExchange,
const std::string& userId,
const std::string& connectionId)
{
+ QueueSettings settings(constSettings); // So we can modify them
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
@@ -1335,6 +1336,10 @@ std::pair<boost::shared_ptr<Queue>, bool
if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
}
+ // Identify queues that won't survive a failover: exclusive, auto-delete with no delay.
+ if (owner && settings.autodelete && !settings.autoDeleteDelay)
+ settings.isTemporary = true;
+
std::pair<Queue::shared_ptr, bool> result =
queues.declare(name, settings, alternate, false/*recovering*/,
owner, connectionId, userId);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Apr 7 21:22:55 2014
@@ -521,7 +521,7 @@ void Queue::markInUse(bool controlling)
else users.addOther();
}
-void Queue::releaseFromUse(bool controlling)
+void Queue::releaseFromUse(bool controlling, bool doDelete)
{
bool trydelete;
if (controlling) {
@@ -533,7 +533,7 @@ void Queue::releaseFromUse(bool controll
users.removeOther();
trydelete = isUnused(locker);
}
- if (trydelete) scheduleAutoDelete();
+ if (trydelete && doDelete) scheduleAutoDelete();
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive,
@@ -577,11 +577,13 @@ void Queue::consume(Consumer::shared_ptr
if (mgmtObject != 0 && c->isCounted()) {
mgmtObject->inc_consumerCount();
}
- ManagementAgent* agent = broker->getManagementAgent();
- if (agent) {
- agent->raiseEvent(
- _qmf::EventSubscribe(connectionId, userId, name,
- c->getTag(), requestExclusive, ManagementAgent::toMap(arguments)));
+ if (broker) {
+ ManagementAgent* agent = broker->getManagementAgent();
+ if (agent) {
+ agent->raiseEvent(
+ _qmf::EventSubscribe(connectionId, userId, name,
+ c->getTag(), requestExclusive, ManagementAgent::toMap(arguments)));
+ }
}
}
@@ -589,6 +591,7 @@ void Queue::cancel(Consumer::shared_ptr
{
removeListener(c);
if(c->isCounted())
+
{
bool unused;
{
@@ -605,12 +608,12 @@ void Queue::cancel(Consumer::shared_ptr
if (mgmtObject != 0) {
mgmtObject->dec_consumerCount();
}
- if (unused && settings.autodelete) {
- scheduleAutoDelete();
- }
+ if (unused && settings.autodelete) scheduleAutoDelete();
+ }
+ if (broker) {
+ ManagementAgent* agent = broker->getManagementAgent();
+ if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(connectionId, userId, c->getTag()));
}
- ManagementAgent* agent = broker->getManagementAgent();
- if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(connectionId, userId, c->getTag()));
}
/**
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Apr 7 21:22:55 2014
@@ -363,7 +363,7 @@ class Queue : public boost::enable_share
* be created.
*/
QPID_BROKER_EXTERN void markInUse(bool controlling=false);
- QPID_BROKER_EXTERN void releaseFromUse(bool controlling=false);
+ QPID_BROKER_EXTERN void releaseFromUse(bool controlling=false, bool doDelete=true);
QPID_BROKER_EXTERN uint32_t purge(const uint32_t purge_request=0, //defaults to all messages
boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(),
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Apr 7 21:22:55 2014
@@ -283,8 +283,6 @@ void SessionAdapter::QueueHandlerImpl::d
} catch (const qpid::types::Exception& e) {
throw InvalidArgumentException(e.what());
}
- // Identify queues that won't survive a failover.
- settings.isTemporary = exclusive && autoDelete && !settings.autoDeleteDelay;
std::pair<Queue::shared_ptr, bool> queue_created =
getBroker().createQueue(name, settings,
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Mon Apr 7 21:22:55 2014
@@ -264,7 +264,7 @@ Session::ResolvedNode Session::resolve(c
QPID_LOG_CAT(warning, model, "Node name will be ambiguous, creation of queue named " << name << " requested when exchange of the same name already exists");
}
std::pair<boost::shared_ptr<Queue>, bool> result
- = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId());
+ = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), node.properties.isExclusive() ? this:0, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId());
node.queue = result.first;
node.created = result.second;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Mon Apr 7 21:22:55 2014
@@ -156,7 +156,7 @@ QueueReplicator::QueueReplicator(HaBroke
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
// Don't allow backup queues to auto-delete, primary decides when to delete.
- if (q->isAutoDelete()) q->markInUse();
+ if (q->isAutoDelete()) q->markInUse(false);
dispatch[DequeueEvent::KEY] =
boost::bind(&QueueReplicator::dequeueEvent, this, _1, _2);
@@ -352,13 +352,13 @@ void QueueReplicator::promoted() {
queue->getMessageInterceptors().add(
boost::shared_ptr<IdSetter>(new IdSetter(maxId+1)));
// Process auto-deletes
- if (queue->isAutoDelete() && subscribed) {
+ if (queue->isAutoDelete()) {
// Make a temporary shared_ptr to prevent premature deletion of queue.
// Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
// which could delete the queue while it's still running it's destroyed logic.
boost::shared_ptr<Queue> q(queue);
- q->releaseFromUse();
- q->scheduleAutoDelete();
+ // See markInUse in ctor: release but don't delete if never used.
+ q->releaseFromUse(false/*controller*/, subscribed/*doDelete*/);
}
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h Mon Apr 7 21:22:55 2014
@@ -64,8 +64,6 @@ class ReplicationTest
// Calculate level for objects that may not have replication set,
// including auto-delete/exclusive settings.
- ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive) const;
- ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive) const;
ReplicateLevel useLevel(const broker::Queue&) const;
ReplicateLevel useLevel(const broker::Exchange&) const;
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp Mon Apr 7 21:22:55 2014
@@ -119,6 +119,8 @@ void ConnectionOptions::set(const std::s
nestAnnotations = value;
} else if (name == "set-to-on-send" || name == "set_to_on_send") {
setToOnSend = value;
+ } else if (name == "properties" || name == "client-properties" || name == "client_properties") {
+ properties = value.asMap();
} else {
throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
}
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.h?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.h Mon Apr 7 21:22:55 2014
@@ -47,6 +47,7 @@ struct ConnectionOptions : qpid::client:
std::string identifier;
bool nestAnnotations;
bool setToOnSend;
+ std::map<std::string, qpid::types::Variant> properties;
QPID_MESSAGING_EXTERN ConnectionOptions(const std::map<std::string, qpid::types::Variant>&);
QPID_MESSAGING_EXTERN void set(const std::string& name, const qpid::types::Variant& value);
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Mon Apr 7 21:22:55 2014
@@ -18,6 +18,8 @@
* under the License.
*
*/
+
+#include "PnData.h"
#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/AddressImpl.h"
@@ -33,6 +35,7 @@ extern "C" {
#include <proton/engine.h>
}
+
namespace qpid {
namespace messaging {
namespace amqp {
@@ -239,179 +242,6 @@ bool replace(Variant::Map& map, const st
}
}
-void write(pn_data_t* data, const Variant& value);
-
-void write(pn_data_t* data, const Variant::Map& map)
-{
- pn_data_put_map(data);
- pn_data_enter(data);
- for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
- pn_data_put_string(data, convert(i->first));
- write(data, i->second);
- }
- pn_data_exit(data);
-}
-void write(pn_data_t* data, const Variant::List& list)
-{
- pn_data_put_list(data);
- pn_data_enter(data);
- for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
- write(data, *i);
- }
- pn_data_exit(data);
-}
-void write(pn_data_t* data, const Variant& value)
-{
- switch (value.getType()) {
- case qpid::types::VAR_VOID:
- pn_data_put_null(data);
- break;
- case qpid::types::VAR_BOOL:
- pn_data_put_bool(data, value.asBool());
- break;
- case qpid::types::VAR_UINT64:
- pn_data_put_ulong(data, value.asUint64());
- break;
- case qpid::types::VAR_INT64:
- pn_data_put_long(data, value.asInt64());
- break;
- case qpid::types::VAR_DOUBLE:
- pn_data_put_double(data, value.asDouble());
- break;
- case qpid::types::VAR_STRING:
- pn_data_put_string(data, convert(value.asString()));
- break;
- case qpid::types::VAR_MAP:
- write(data, value.asMap());
- break;
- case qpid::types::VAR_LIST:
- write(data, value.asList());
- break;
- default:
- break;
- }
-}
-bool read(pn_data_t* data, pn_type_t type, qpid::types::Variant& value);
-bool read(pn_data_t* data, qpid::types::Variant& value)
-{
- return read(data, pn_data_type(data), value);
-}
-void readList(pn_data_t* data, qpid::types::Variant::List& value)
-{
- size_t count = pn_data_get_list(data);
- pn_data_enter(data);
- for (size_t i = 0; i < count && pn_data_next(data); ++i) {
- qpid::types::Variant e;
- if (read(data, e)) value.push_back(e);
- }
- pn_data_exit(data);
-}
-void readMap(pn_data_t* data, qpid::types::Variant::Map& value)
-{
- size_t count = pn_data_get_list(data);
- pn_data_enter(data);
- for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) {
- std::string key = convert(pn_data_get_symbol(data));
- pn_data_next(data);
- qpid::types::Variant e;
- if (read(data, e)) value[key]= e;
- }
- pn_data_exit(data);
-}
-void readArray(pn_data_t* data, qpid::types::Variant::List& value)
-{
- size_t count = pn_data_get_array(data);
- pn_type_t type = pn_data_get_array_type(data);
- pn_data_enter(data);
- for (size_t i = 0; i < count && pn_data_next(data); ++i) {
- qpid::types::Variant e;
- if (read(data, type, e)) value.push_back(e);
- }
- pn_data_exit(data);
-}
-bool read(pn_data_t* data, pn_type_t type, qpid::types::Variant& value)
-{
- switch (type) {
- case PN_NULL:
- if (value.getType() != qpid::types::VAR_VOID) value = qpid::types::Variant();
- return true;
- case PN_BOOL:
- value = pn_data_get_bool(data);
- return true;
- case PN_UBYTE:
- value = pn_data_get_ubyte(data);
- return true;
- case PN_BYTE:
- value = pn_data_get_byte(data);
- return true;
- case PN_USHORT:
- value = pn_data_get_ushort(data);
- return true;
- case PN_SHORT:
- value = pn_data_get_short(data);
- return true;
- case PN_UINT:
- value = pn_data_get_uint(data);
- return true;
- case PN_INT:
- value = pn_data_get_int(data);
- return true;
- case PN_CHAR:
- value = pn_data_get_char(data);
- return true;
- case PN_ULONG:
- value = pn_data_get_ulong(data);
- return true;
- case PN_LONG:
- value = pn_data_get_long(data);
- return true;
- case PN_TIMESTAMP:
- value = pn_data_get_timestamp(data);
- return true;
- case PN_FLOAT:
- value = pn_data_get_float(data);
- return true;
- case PN_DOUBLE:
- value = pn_data_get_double(data);
- return true;
- case PN_UUID:
- value = qpid::types::Uuid(pn_data_get_uuid(data).bytes);
- return true;
- case PN_BINARY:
- value = convert(pn_data_get_binary(data));
- value.setEncoding(qpid::types::encodings::BINARY);
- return true;
- case PN_STRING:
- value = convert(pn_data_get_string(data));
- value.setEncoding(qpid::types::encodings::UTF8);
- return true;
- case PN_SYMBOL:
- value = convert(pn_data_get_string(data));
- value.setEncoding(qpid::types::encodings::ASCII);
- return true;
- case PN_LIST:
- value = qpid::types::Variant::List();
- readList(data, value.asList());
- return true;
- break;
- case PN_MAP:
- value = qpid::types::Variant::Map();
- readMap(data, value.asMap());
- return true;
- case PN_ARRAY:
- value = qpid::types::Variant::List();
- readArray(data, value.asList());
- return true;
- case PN_DESCRIBED:
- case PN_DECIMAL32:
- case PN_DECIMAL64:
- case PN_DECIMAL128:
- default:
- return false;
- }
-
-}
-
const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes
const uint32_t DEFAULT_TIMEOUT(0);
}
@@ -680,9 +510,9 @@ void AddressHelper::checkAssertion(pn_te
requested.erase(j->first);
}
} else if (key == AUTO_DELETE) {
- read(data, v);
+ PnData(data).read(v);
isAutoDeleted = v.asBool();
- } else if (j != requested.end() && (read(data, v) && v.asString() == j->second.asString())) {
+ } else if (j != requested.end() && (PnData(data).read(v) && v.asString() == j->second.asString())) {
requested.erase(j->first);
}
}
@@ -815,7 +645,7 @@ void AddressHelper::configure(pn_link_t*
} else {
pn_data_put_ulong(filter, i->descriptorCode);
}
- write(filter, i->value);
+ PnData(filter).write(i->value);
pn_data_exit(filter);
}
pn_data_exit(filter);
@@ -902,7 +732,7 @@ void AddressHelper::setNodeProperties(pn
putLifetimePolicy(data, toLifetimePolicy(i->second.asString()));
} else {
pn_data_put_symbol(data, convert(i->first));
- write(data, i->second);
+ PnData(data).write(i->second);
}
}
pn_data_exit(data);
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Mon Apr 7 21:22:55 2014
@@ -20,6 +20,7 @@
*/
#include "ConnectionContext.h"
#include "DriverImpl.h"
+#include "PnData.h"
#include "ReceiverContext.h"
#include "Sasl.h"
#include "SenderContext.h"
@@ -49,6 +50,8 @@ extern "C" {
namespace qpid {
namespace messaging {
namespace amqp {
+using types::Variant;
+
namespace {
//remove conditional when 0.5 is no longer supported
@@ -872,13 +875,6 @@ namespace {
const std::string CLIENT_PROCESS_NAME("qpid.client_process");
const std::string CLIENT_PID("qpid.client_pid");
const std::string CLIENT_PPID("qpid.client_ppid");
-pn_bytes_t convert(const std::string& s)
-{
- pn_bytes_t result;
- result.start = const_cast<char*>(s.data());
- result.size = s.size();
- return result;
-}
}
void ConnectionContext::setProperties()
{
@@ -886,15 +882,21 @@ void ConnectionContext::setProperties()
pn_data_put_map(data);
pn_data_enter(data);
- pn_data_put_symbol(data, convert(CLIENT_PROCESS_NAME));
+ pn_data_put_symbol(data, PnData::str(CLIENT_PROCESS_NAME));
std::string processName = sys::SystemInfo::getProcessName();
- pn_data_put_string(data, convert(processName));
+ pn_data_put_string(data, PnData::str(processName));
- pn_data_put_symbol(data, convert(CLIENT_PID));
+ pn_data_put_symbol(data, PnData::str(CLIENT_PID));
pn_data_put_int(data, sys::SystemInfo::getProcessId());
- pn_data_put_symbol(data, convert(CLIENT_PPID));
+ pn_data_put_symbol(data, PnData::str(CLIENT_PPID));
pn_data_put_int(data, sys::SystemInfo::getParentProcessId());
+
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i)
+ {
+ pn_data_put_symbol(data, PnData::str(i->first));
+ PnData(data).write(i->second);
+ }
pn_data_exit(data);
}
Added: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp?rev=1585588&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp Mon Apr 7 21:22:55 2014
@@ -0,0 +1,218 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PnData.h"
+#include "qpid/types/encodings.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+using types::Variant;
+
+void PnData::write(const Variant::Map& map)
+{
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+ pn_data_put_string(data, str(i->first));
+ write(i->second);
+ }
+ pn_data_exit(data);
+}
+void PnData::write(const Variant::List& list)
+{
+ pn_data_put_list(data);
+ pn_data_enter(data);
+ for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ write(*i);
+ }
+ pn_data_exit(data);
+}
+void PnData::write(const Variant& value)
+{
+ switch (value.getType()) {
+ case qpid::types::VAR_VOID:
+ pn_data_put_null(data);
+ break;
+ case qpid::types::VAR_BOOL:
+ pn_data_put_bool(data, value.asBool());
+ break;
+ case qpid::types::VAR_UINT64:
+ pn_data_put_ulong(data, value.asUint64());
+ break;
+ case qpid::types::VAR_INT64:
+ pn_data_put_long(data, value.asInt64());
+ break;
+ case qpid::types::VAR_DOUBLE:
+ pn_data_put_double(data, value.asDouble());
+ break;
+ case qpid::types::VAR_STRING:
+ pn_data_put_string(data, str(value.asString()));
+ break;
+ case qpid::types::VAR_MAP:
+ write(value.asMap());
+ break;
+ case qpid::types::VAR_LIST:
+ write(value.asList());
+ break;
+ default:
+ break;
+ }
+}
+
+bool PnData::read(qpid::types::Variant& value)
+{
+ return read(pn_data_type(data), value);
+}
+
+void PnData::readList(qpid::types::Variant::List& value)
+{
+ size_t count = pn_data_get_list(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ qpid::types::Variant e;
+ if (read(e)) value.push_back(e);
+ }
+ pn_data_exit(data);
+}
+
+void PnData::readMap(qpid::types::Variant::Map& value)
+{
+ size_t count = pn_data_get_list(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) {
+ std::string key = str(pn_data_get_symbol(data));
+ pn_data_next(data);
+ qpid::types::Variant e;
+ if (read(e)) value[key]= e;
+ }
+ pn_data_exit(data);
+}
+
+void PnData::readArray(qpid::types::Variant::List& value)
+{
+ size_t count = pn_data_get_array(data);
+ pn_type_t type = pn_data_get_array_type(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ qpid::types::Variant e;
+ if (read(type, e)) value.push_back(e);
+ }
+ pn_data_exit(data);
+}
+
+bool PnData::read(pn_type_t type, qpid::types::Variant& value)
+{
+ switch (type) {
+ case PN_NULL:
+ if (value.getType() != qpid::types::VAR_VOID) value = qpid::types::Variant();
+ return true;
+ case PN_BOOL:
+ value = pn_data_get_bool(data);
+ return true;
+ case PN_UBYTE:
+ value = pn_data_get_ubyte(data);
+ return true;
+ case PN_BYTE:
+ value = pn_data_get_byte(data);
+ return true;
+ case PN_USHORT:
+ value = pn_data_get_ushort(data);
+ return true;
+ case PN_SHORT:
+ value = pn_data_get_short(data);
+ return true;
+ case PN_UINT:
+ value = pn_data_get_uint(data);
+ return true;
+ case PN_INT:
+ value = pn_data_get_int(data);
+ return true;
+ case PN_CHAR:
+ value = pn_data_get_char(data);
+ return true;
+ case PN_ULONG:
+ value = pn_data_get_ulong(data);
+ return true;
+ case PN_LONG:
+ value = pn_data_get_long(data);
+ return true;
+ case PN_TIMESTAMP:
+ value = pn_data_get_timestamp(data);
+ return true;
+ case PN_FLOAT:
+ value = pn_data_get_float(data);
+ return true;
+ case PN_DOUBLE:
+ value = pn_data_get_double(data);
+ return true;
+ case PN_UUID:
+ value = qpid::types::Uuid(pn_data_get_uuid(data).bytes);
+ return true;
+ case PN_BINARY:
+ value = str(pn_data_get_binary(data));
+ value.setEncoding(qpid::types::encodings::BINARY);
+ return true;
+ case PN_STRING:
+ value = str(pn_data_get_string(data));
+ value.setEncoding(qpid::types::encodings::UTF8);
+ return true;
+ case PN_SYMBOL:
+ value = str(pn_data_get_string(data));
+ value.setEncoding(qpid::types::encodings::ASCII);
+ return true;
+ case PN_LIST:
+ value = qpid::types::Variant::List();
+ readList(value.asList());
+ return true;
+ break;
+ case PN_MAP:
+ value = qpid::types::Variant::Map();
+ readMap(value.asMap());
+ return true;
+ case PN_ARRAY:
+ value = qpid::types::Variant::List();
+ readArray(value.asList());
+ return true;
+ case PN_DESCRIBED:
+ case PN_DECIMAL32:
+ case PN_DECIMAL64:
+ case PN_DECIMAL128:
+ default:
+ return false;
+ }
+
+}
+
+pn_bytes_t PnData::str(const std::string& s)
+{
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
+}
+
+std::string PnData::str(const pn_bytes_t& in)
+{
+ return std::string(in.start, in.size);
+}
+
+}}} // namespace qpid::messaging::amqp
Propchange: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.h?rev=1585588&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.h Mon Apr 7 21:22:55 2014
@@ -0,0 +1,60 @@
+#ifndef QPID_MESSAGING_AMQP_PNDATA_H
+#define QPID_MESSAGING_AMQP_PNDATA_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/types/Variant.h"
+extern "C" {
+#include <proton/engine.h>
+}
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+/**
+ * Helper class to read/write messaging types to/from pn_data_t.
+ */
+class PnData
+{
+ public:
+ PnData(pn_data_t* d) : data(d) {}
+
+ void write(const types::Variant& value);
+ void write(const types::Variant::Map& map);
+ void write(const types::Variant::List& list);
+
+ bool read(pn_type_t type, types::Variant& value);
+ bool read(types::Variant& value);
+ void readList(types::Variant::List& value);
+ void readMap(types::Variant::Map& value);
+ void readArray(types::Variant::List& value);
+
+ static pn_bytes_t str(const std::string&);
+ static std::string str(const pn_bytes_t&);
+
+ private:
+ pn_data_t* data;
+};
+}}} // namespace messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_PNDATA_H*/
Propchange: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Mon Apr 7 21:22:55 2014
@@ -21,19 +21,47 @@
import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re
import qpid, traceback, signal
-from qpid import connection, messaging, util
+from qpid import connection, util
from qpid.compat import format_exc
from qpid.harness import Skipped
from unittest import TestCase
from copy import copy
from threading import Thread, Lock, Condition
from logging import getLogger
+from qpidtoollibs import BrokerAgent
-try: import qmf.console
-except: print "Cannot import module qmf.console, skipping tests"; exit(0);
-
+# NOTE: Always import native client qpid.messaging, import swigged client
+# qpid_messaging if possible. qpid_messaing is set to None if not available.
+#
+# qm is set to qpid_messaging if it is available, qpid.messaging if not.
+# Use qm.X to specify names from the default messaging module.
+#
+# Set environment variable QPID_PY_NO_SWIG=1 to prevent qpid_messaging from loading.
+#
+# BrokerTest can be configured to determine which protocol is used by default:
+#
+# -DPROTOCOL="amqpX": Use protocol "amqpX". Defaults to amqp1.0 if swig client
+# is being used, amqp0-10 if native client is being used.
+#
+# The configured defaults can be over-ridden on BrokerTest.connect and some
+# other methods by specifying native=True|False and protocol="amqpX"
+#
+
+import qpid.messaging
+qm = qpid.messaging
+qpid_messaging = None
+if not os.environ.get("QPID_PY_NO_SWIG"):
+ try:
+ import qpid_messaging
+ from qpid.datatypes import uuid4
+ qm = qpid_messaging
+ # Silence warnings from swigged messaging library unless enabled in environment.
+ if "QPID_LOG_ENABLE" not in os.environ and "QPID_TRACE" not in os.environ:
+ qm.Logger.configure(["--log-enable=error"])
+ except ImportError:
+ print "Cannot load python SWIG bindings, falling back to native qpid.messaging."
-log = getLogger("qpid.brokertest")
+log = getLogger("brokertest")
# Values for expected outcome of process at end of test
EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test.
@@ -149,7 +177,7 @@ class Popen(subprocess.Popen):
err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
- def stop(self): # Clean up at end of test.
+ def teardown(self): # Clean up at end of test.
try:
if self.expect == EXPECT_UNKNOWN:
try: self.kill() # Just make sure its dead
@@ -253,14 +281,16 @@ class Broker(Popen):
self.test = test
self._port=port
+ args = copy(args)
+ if BrokerTest.amqp_lib: args += ["--load-module", BrokerTest.amqp_lib]
if BrokerTest.store_lib and not test_store:
- args = args + ['--load-module', BrokerTest.store_lib]
+ args += ['--load-module', BrokerTest.store_lib]
if BrokerTest.sql_store_lib:
- args = args + ['--load-module', BrokerTest.sql_store_lib]
- args = args + ['--catalog', BrokerTest.sql_catalog]
+ args += ['--load-module', BrokerTest.sql_store_lib]
+ args += ['--catalog', BrokerTest.sql_catalog]
if BrokerTest.sql_clfs_store_lib:
- args = args + ['--load-module', BrokerTest.sql_clfs_store_lib]
- args = args + ['--catalog', BrokerTest.sql_catalog]
+ args += ['--load-module', BrokerTest.sql_clfs_store_lib]
+ args += ['--catalog', BrokerTest.sql_catalog]
cmd = [BrokerTest.qpidd_exec, "--port", port, "--interface", "127.0.0.1", "--no-module-dir"] + args
if not "--auth" in args: cmd.append("--auth=no")
if wait != None:
@@ -288,13 +318,11 @@ class Broker(Popen):
cmd += ["--data-dir", self.datadir]
if show_cmd: print cmd
Popen.__init__(self, cmd, expect, stdout=PIPE)
- test.cleanup_stop(self)
+ test.teardown_add(self)
self._host = "127.0.0.1"
- log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
+ self._agent = None
- def startQmf(self, handler=None):
- self.qmf_session = qmf.console.Session(handler)
- self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port()))
+ log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
def host(self): return self._host
@@ -310,22 +338,25 @@ class Broker(Popen):
def unexpected(self,msg):
raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname))
- def connect(self, timeout=5, **kwargs):
- """New API connection to the broker."""
- return messaging.Connection.establish(self.host_port(), timeout=timeout, **kwargs)
-
- def connect_old(self):
- """Old API connection to the broker."""
- socket = qpid.util.connect(self.host(),self.port())
- connection = qpid.connection.Connection (sock=socket)
- connection.start()
- return connection;
+ def connect(self, timeout=5, native=False, **kwargs):
+ """New API connection to the broker.
+ @param native if True force use of the native qpid.messaging client
+ even if swig client is available.
+ """
+ if self.test.protocol: kwargs.setdefault("protocol", self.test.protocol)
+ if native: connection_class = qpid.messaging.Connection
+ else: connection_class = qm.Connection
+ return connection_class.establish(self.host_port(), timeout=timeout, **kwargs)
+
+ @property
+ def agent(self, **kwargs):
+ """Return a BrokerAgent for this broker"""
+ if not self._agent: self._agent = BrokerAgent(self.connect(**kwargs))
+ return self._agent
+
def declare_queue(self, queue):
- c = self.connect_old()
- s = c.session(str(qpid.datatypes.uuid4()))
- s.queue_declare(queue=queue)
- c.close()
+ self.agent.addQueue(queue)
def _prep_sender(self, queue, durable, xprops):
s = queue + "; {create:always, node:{durable:" + str(durable)
@@ -402,7 +433,7 @@ def browse(session, queue, timeout=0, tr
contents = []
try:
while True: contents.append(transform(r.fetch(timeout=timeout)))
- except messaging.Empty: pass
+ except qm.Empty: pass
finally: r.close()
return contents
@@ -451,30 +482,42 @@ class BrokerTest(TestCase):
def configure(self, config): self.config=config
def setUp(self):
- outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp"
+ defs = self.config.defines
+ outdir = defs.get("OUTDIR") or "brokertest.tmp"
self.dir = os.path.join(self.rootdir, outdir, self.id())
os.makedirs(self.dir)
os.chdir(self.dir)
- self.stopem = [] # things to stop at end of test
+ self.teardown_list = [] # things to tear down at end of test
+
+ self.protocol = defs.get("PROTOCOL") or ("amqp1.0" if qpid_messaging else "amqp0-10")
+ self.tx_protocol = "amqp0-10" # Transactions not yet supported over 1.0
+
def tearDown(self):
err = []
- for p in self.stopem:
- try: p.stop()
- except Exception, e: err.append(str(e))
- self.stopem = [] # reset in case more processes start
+ self.teardown_list.reverse() # Tear down in reverse order
+ for p in self.teardown_list:
+ log.debug("Tearing down %s", p)
+ try:
+ # Call the first of the methods that is available on p.
+ for m in ["teardown", "close"]:
+ a = getattr(p, m, None)
+ if a: a(); break
+ else: raise Exception("Don't know how to tear down %s", p)
+ except Exception, e: err.append("%s: %s"%(e.__class__.__name__, str(e)))
+ self.teardown_list = [] # reset in case more processes start
os.chdir(self.rootdir)
if err: raise Exception("Unexpected process status:\n "+"\n ".join(err))
- def cleanup_stop(self, stopable):
- """Call thing.stop at end of test"""
- self.stopem.append(stopable)
+ def teardown_add(self, thing):
+ """Call thing.teardown() or thing.close() at end of test"""
+ self.teardown_list.append(thing)
def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
"""Start a process that will be killed at end of test, in the test dir."""
os.chdir(self.dir)
p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr)
- self.cleanup_stop(p)
+ self.teardown_add(p)
return p
def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False):
@@ -490,6 +533,11 @@ class BrokerTest(TestCase):
def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)
+ def protocol_option(self, connection_options=""):
+ if "protocol" in connection_options: return connection_options
+ else: return ",".join(filter(None, [connection_options,"protocol:'%s'"%self.protocol]))
+
+
def join(thread, timeout=30):
thread.join(timeout)
if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
@@ -524,7 +572,7 @@ class NumberedSender(Thread):
def __init__(self, broker, max_depth=None, queue="test-queue",
connection_options=RECONNECT_OPTIONS,
- failover_updates=True, url=None, args=[]):
+ failover_updates=False, url=None, args=[]):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
Requires self.notify_received(n) to be called each time messages are received.
@@ -533,7 +581,7 @@ class NumberedSender(Thread):
cmd = ["qpid-send",
"--broker", url or broker.host_port(),
"--address", "%s;{create:always}"%queue,
- "--connection-options", "{%s}"%(connection_options),
+ "--connection-options", "{%s}"%(broker.test.protocol_option(connection_options)),
"--content-stdin"
] + args
if failover_updates: cmd += ["--failover-updates"]
@@ -592,7 +640,7 @@ class NumberedReceiver(Thread):
"""
def __init__(self, broker, sender=None, queue="test-queue",
connection_options=RECONNECT_OPTIONS,
- failover_updates=True, url=None, args=[]):
+ failover_updates=False, url=None, args=[]):
"""
sender: enable flow control. Call sender.received(n) for each message received.
"""
@@ -601,7 +649,7 @@ class NumberedReceiver(Thread):
cmd = ["qpid-receive",
"--broker", url or broker.host_port(),
"--address", "%s;{create:always}"%queue,
- "--connection-options", "{%s}"%(connection_options),
+ "--connection-options", "{%s}"%(broker.test.protocol_option(connection_options)),
"--forever"
]
if failover_updates: cmd += [ "--failover-updates" ]
Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Mon Apr 7 21:22:55 2014
@@ -20,8 +20,6 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
-from qpid.datatypes import uuid4, UUID
from brokertest import *
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG, INFO
@@ -44,7 +42,7 @@ class LogLevel:
class QmfAgent(object):
"""Access to a QMF broker agent."""
def __init__(self, address, **kwargs):
- self._connection = Connection.establish(
+ self._connection = qm.Connection.establish(
address, client_properties={"qpid.ha-admin":1}, **kwargs)
self._agent = BrokerAgent(self._connection)
@@ -105,9 +103,9 @@ class HaPort:
self.port = self.socket.getsockname()[1]
self.fileno = self.socket.fileno()
self.stopped = False
- test.cleanup_stop(self) # Stop during test.tearDown
+ test.teardown_add(self) # Stop during test.tearDown
- def stop(self): # Called in tearDown
+ def teardown(self): # Called in tearDown
if not self.stopped:
self.stopped = True
self.socket.shutdown(socket.SHUT_RDWR)
@@ -180,6 +178,7 @@ acl allow all all
def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url])
def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]);
def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
+ @property
def agent(self):
if not self._agent:
cred = self.client_credentials
@@ -190,7 +189,7 @@ acl allow all all
return self._agent
def qmf(self):
- hb = self.agent().getHaBroker()
+ hb = self.agent.getHaBroker()
hb.update()
return hb
@@ -203,19 +202,19 @@ acl allow all all
try:
self._status = self.ha_status()
return self._status == status;
- except ConnectionError: return False
+ except qm.ConnectionError: return False
assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%(
self, status, self._status)
- def wait_queue(self, queue, timeout=1):
+ def wait_queue(self, queue, timeout=1, msg="wait_queue"):
""" Wait for queue to be visible via QMF"""
- agent = self.agent()
- assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout)
+ agent = self.agent
+ assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), msg+"queue %s not present"%queue
- def wait_no_queue(self, queue, timeout=1):
+ def wait_no_queue(self, queue, timeout=1, msg="wait_no_queue"):
""" Wait for queue to be invisible via QMF"""
- agent = self.agent()
- assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout)
+ agent = self.agent
+ assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout), "%s: queue %s still present"%(msg,queue)
# TODO aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
@@ -273,12 +272,12 @@ acl allow all all
def assert_connect_fail(self):
try:
self.connect()
- self.test.fail("Expected ConnectionError")
- except ConnectionError: pass
+ self.test.fail("Expected qm.ConnectionError")
+ except qm.ConnectionError: pass
def try_connect(self):
try: return self.connect()
- except ConnectionError: return None
+ except qm.ConnectionError: return None
def ready(self, *args, **kwargs):
if not 'client_properties' in kwargs: kwargs['client_properties'] = {}
@@ -286,7 +285,7 @@ acl allow all all
return Broker.ready(self, *args, **kwargs)
def kill(self, final=True):
- if final: self.ha_port.stop()
+ if final: self.ha_port.teardown()
self._agent = None
return Broker.kill(self)
@@ -355,9 +354,9 @@ class HaCluster(object):
b.set_brokers_url(self.url)
b.set_public_url(self.url)
- def connect(self, i):
+ def connect(self, i, **kwargs):
"""Connect with reconnect_urls"""
- return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
+ return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs)
def kill(self, i, promote_next=True, final=True):
"""Kill broker i, promote broker i+1"""
@@ -393,7 +392,7 @@ def wait_address(session, address):
"""Wait for an address to become valid."""
def check():
try: session.sender(address); return True
- except NotFound: return False
+ except qm.NotFound: return False
assert retry(check), "Timed out waiting for address %s"%(address)
def valid_address(session, address):
@@ -401,6 +400,6 @@ def valid_address(session, address):
try:
session.receiver(address)
return True
- except NotFound: return False
+ except qm.NotFound: return False
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org