You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/04/07 23:22:56 UTC

svn commit: r1585588 [1/2] - in /qpid/trunk/qpid: cpp/bindings/qpid/python/ cpp/include/qpid/ cpp/include/qpid/messaging/ cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp/ cpp/src/qpid/ha/ cpp/src/qpid/messaging/ cpp/src/qpid/messaging/amqp/ cpp/...

Author: aconway
Date: Mon Apr  7 21:22:55 2014
New Revision: 1585588

URL: http://svn.apache.org/r1585588
Log:
QPID-5560: HA tests do not use AMQP 1.0

The HA tests were using only AMQP 0-10.
Modified the tests to use AMQP 1.0 if available (still use 0-10 if 1.0 is not available)
Fixed bugs uncovered both in the tests and in the AMQP 1.0 implementation.

Summary of changes:
- brokertest.py: configurable support for of swig vs. native and amqp0-10 vs. 1.0
  - default to swig+amqp1.0 if swig is available, native+amqp0-10 otherwise

- qpidtoollibs/broker.py: enable use of swig client with BrokerAgent

- Swig python client:
  - support for passing client_properties/properties.
    - expose AddressHelper pn_data read/write as PnData helper class
  - set sender/receiver capacity on creation
  - limited disposition support - rejected messages.
  - support for additional timeout parameters
  - expose messaging::Logger, allow log configuration to be set from python.

- ha_tests.py:
  - bind, delete policies not supported by AMQP 1.0, switched to using BrokerAgent QMF.
  - pass protocol:amqp1.0 connection-option to c++ test clients (qpid-send, qpid-receive)
  - TX tests forsce use of 0-10 protocol (but still with Swig client if enabled.)

- Broker fixes:
  - Queue::Settings::isTemporary was set in the 0-10 SessionAdapter, moved to Broker::createQueue.
  - broker::amqp::Session was always setting an exclusive owner in createQueue

Added:
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/bindings/qpid/python/qpid_messaging.i
    qpid/trunk/qpid/cpp/include/qpid/messaging/Logger.h
    qpid/trunk/qpid/cpp/include/qpid/qpid.i
    qpid/trunk/qpid/cpp/include/qpid/swig_python_typemaps.i
    qpid/trunk/qpid/cpp/src/amqp.cmake
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py
    qpid/trunk/qpid/cpp/src/tests/interlink_tests.py
    qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py
    qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/resize.py
    qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/store_test.py
    qpid/trunk/qpid/cpp/src/tests/qpidd_qmfv2_tests.py
    qpid/trunk/qpid/cpp/src/tests/test_env.sh.in
    qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py

Modified: qpid/trunk/qpid/cpp/bindings/qpid/python/qpid_messaging.i
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qpid/python/qpid_messaging.i?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qpid/python/qpid_messaging.i (original)
+++ qpid/trunk/qpid/cpp/bindings/qpid/python/qpid_messaging.i Mon Apr  7 21:22:55 2014
@@ -141,7 +141,7 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi
  * don't even know why there is a non-const version of the method. */
 %rename(opened) qpid::messaging::Connection::isOpen();
 %rename(_close) qpid::messaging::Connection::close();
-%rename(receiver) qpid::messaging::Session::createReceiver;
+%rename(_receiver) qpid::messaging::Session::createReceiver;
 %rename(_sender) qpid::messaging::Session::createSender;
 %rename(_acknowledge_all) qpid::messaging::Session::acknowledge(bool);
 %rename(_acknowledge_msg) qpid::messaging::Session::acknowledge(
@@ -161,6 +161,16 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi
 %rename(_getTtl) qpid::messaging::Message::getTtl;
 %rename(_setTtl) qpid::messaging::Message::setTtl;
 
+%rename(_sync) qpid::messaging::Session::sync;
+
+// Capitalize constant names correctly for python
+%rename(TRACE) qpid::messaging::trace;
+%rename(DEBUG) qpid::messaging::debug;
+%rename(INFO) qpid::messaging::info;
+%rename(NOTICE) qpid::messaging::notice;
+%rename(WARNING) qpid::messaging::warning;
+%rename(ERROR) qpid::messaging::error;
+%rename(CRITICAL) qpid::messaging::critical;
 
 %include "qpid/qpid.i"
 
@@ -221,31 +231,77 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi
 
     %pythoncode %{
         @staticmethod
-        def establish(url=None, **options) :
+        def establish(url=None, timeout=None, **options) :
+            if timeout and "reconnect-timeout" not in options:
+                options["reconnect-timeout"] = timeout
             conn = Connection(url, **options)
             conn.open()
             return conn
     %}
 }
 
+%pythoncode %{
+    # Disposition class from messaging/message.py
+    class Disposition:
+        def __init__(self, type, **options):
+            self.type = type
+            self.options = options
+
+        def __repr__(self):
+            args = [str(self.type)] + ["%s=%r" % (k, v) for k, v in self.options.items()]
+            return "Disposition(%s)" % ", ".join(args)
+
+    # Consntants from messaging/constants.py
+    __SELF__ = object()
+
+    class Constant:
+
+      def __init__(self, name, value=__SELF__):
+        self.name = name
+        if value is __SELF__:
+          self.value = self
+        else:
+          self.value = value
+
+      def __repr__(self):
+        return self.name
+
+    AMQP_PORT = 5672
+    AMQPS_PORT = 5671
+
+    UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
+
+    REJECTED = Constant("REJECTED")
+    RELEASED = Constant("RELEASED")
+%}
+
 %extend qpid::messaging::Session {
     %pythoncode %{
          def acknowledge(self, message=None, disposition=None, sync=True) :
-             if disposition :
-                 raise Exception("SWIG does not support dispositions yet. Use "
-                                 "Session.reject and Session.release instead")
              if message :
-                 self._acknowledge_msg(message, sync)
+                 if disposition is None: self._acknowledge_msg(message, sync)
+                 # FIXME aconway 2014-02-11: the following does not repsect the sync flag.
+                 elif disposition.type == REJECTED: self.reject(message)
+                 elif disposition.type == RELEASED: self.release(message)
              else :
+                 if disposition : # FIXME aconway 2014-02-11: support this
+                     raise Exception("SWIG does not support dispositions yet. Use "
+                                     "Session.reject and Session.release instead")
                  self._acknowledge_all(sync)
 
          __swig_getmethods__["connection"] = getConnection
          if _newclass: connection = property(getConnection)
 
-         def sender(self, target, **options) :
-            s = self._sender(target)
-            s._setDurable(options.get("durable"))
-            return s
+         def receiver(self, source, capacity=None):
+             r = self._receiver(source)
+             if capacity is not None: r.capacity = capacity
+             return r
+
+         def sender(self, target, durable=None, capacity=None) :
+             s = self._sender(target)
+             if capacity is not None: s.capacity = capacity
+             s._setDurable(durable)
+             return s
 
          def next_receiver(self, timeout=None) :
              if timeout is None :
@@ -254,6 +310,11 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi
                  # Python API uses timeouts in seconds,
                  # but C++ API uses milliseconds
                  return self._next_receiver(Duration(int(1000*timeout)))
+
+         def sync(self, timeout=None):
+             if timeout == 0: self._sync(False) # Non-blocking sync
+             else: self._sync(True)             # Blocking sync, C++ has not timeout.
+
     %}
 }
 
@@ -302,6 +363,8 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi
                  message.durable = self.durable
              return self._send(message, sync)
 
+         def sync(self, timeout=None): self.session.sync(timeout)
+
          __swig_getmethods__["capacity"] = getCapacity
          __swig_setmethods__["capacity"] = setCapacity
          if _newclass: capacity = property(getCapacity, setCapacity)

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Logger.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Logger.h?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Logger.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Logger.h Mon Apr  7 21:22:55 2014
@@ -103,7 +103,7 @@ public:
      * --log-time ("on"|"off|"0"|"1")
      * --log-level ("on"|"off|"0"|"1")
      * --log-source ("on"|"off|"0"|"1")
-     * --log-thread ("on"|"off|"0"|"1")
+    * --log-thread ("on"|"off|"0"|"1")
      * --log-function ("on"|"off|"0"|"1")
      * --log-hires-timestamp ("on"|"off|"0"|"1")
      *

Modified: qpid/trunk/qpid/cpp/include/qpid/qpid.i
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/qpid.i?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/qpid.i (original)
+++ qpid/trunk/qpid/cpp/include/qpid/qpid.i Mon Apr  7 21:22:55 2014
@@ -53,6 +53,7 @@ struct mystr
 #include <qpid/messaging/Message.h>
 #include <qpid/messaging/Duration.h>
 #include <qpid/messaging/FailoverUpdates.h>
+#include <qpid/messaging/Logger.h>
 
 //
 // Wrapper functions for map-decode and list-decode.  This allows us to avoid
@@ -84,6 +85,7 @@ qpid::types::Variant::List& decodeList(c
 %include <qpid/messaging/Session.h>
 %include <qpid/messaging/Connection.h>
 %include <qpid/messaging/FailoverUpdates.h>
+%include <qpid/messaging/Logger.h>
 
 qpid::types::Variant::Map& decodeMap(const qpid::messaging::Message&);
 qpid::types::Variant::List& decodeList(const qpid::messaging::Message&);

Modified: qpid/trunk/qpid/cpp/include/qpid/swig_python_typemaps.i
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/swig_python_typemaps.i?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/swig_python_typemaps.i (original)
+++ qpid/trunk/qpid/cpp/include/qpid/swig_python_typemaps.i Mon Apr  7 21:22:55 2014
@@ -452,3 +452,10 @@ typedef int Py_ssize_t;
     $1 = PyInt_Check($input) ? 1 : 0;
 }
 
+
+/**
+ * argc,argv as python list
+ */
+
+%include <argcargv.i>
+%apply (int ARGC, char **ARGV) { (int argc, const char *argv[]) }

Modified: qpid/trunk/qpid/cpp/src/amqp.cmake
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/amqp.cmake?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/amqp.cmake (original)
+++ qpid/trunk/qpid/cpp/src/amqp.cmake Mon Apr  7 21:22:55 2014
@@ -120,6 +120,8 @@ if (BUILD_AMQP)
          qpid/messaging/amqp/ConnectionHandle.cpp
          qpid/messaging/amqp/DriverImpl.h
          qpid/messaging/amqp/DriverImpl.cpp
+         qpid/messaging/amqp/PnData.h
+         qpid/messaging/amqp/PnData.cpp
          qpid/messaging/amqp/ReceiverContext.h
          qpid/messaging/amqp/ReceiverContext.cpp
          qpid/messaging/amqp/ReceiverHandle.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Apr  7 21:22:55 2014
@@ -1292,12 +1292,13 @@ const std::string Broker::TCP_TRANSPORT(
 
 std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
     const std::string& name,
-    const QueueSettings& settings,
+    const QueueSettings& constSettings,
     const OwnershipToken* owner,
     const std::string& alternateExchange,
     const std::string& userId,
     const std::string& connectionId)
 {
+    QueueSettings settings(constSettings); // So we can modify them
     if (acl) {
         std::map<acl::Property, std::string> params;
         params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
@@ -1335,6 +1336,10 @@ std::pair<boost::shared_ptr<Queue>, bool
         if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
     }
 
+    // Identify queues that won't survive a failover: exclusive, auto-delete with no delay.
+    if (owner && settings.autodelete && !settings.autoDeleteDelay)
+        settings.isTemporary = true;
+
     std::pair<Queue::shared_ptr, bool> result =
         queues.declare(name, settings, alternate, false/*recovering*/,
                        owner, connectionId, userId);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Apr  7 21:22:55 2014
@@ -521,7 +521,7 @@ void Queue::markInUse(bool controlling)
     else users.addOther();
 }
 
-void Queue::releaseFromUse(bool controlling)
+void Queue::releaseFromUse(bool controlling, bool doDelete)
 {
     bool trydelete;
     if (controlling) {
@@ -533,7 +533,7 @@ void Queue::releaseFromUse(bool controll
         users.removeOther();
         trydelete = isUnused(locker);
     }
-    if (trydelete) scheduleAutoDelete();
+    if (trydelete && doDelete) scheduleAutoDelete();
 }
 
 void Queue::consume(Consumer::shared_ptr c, bool requestExclusive,
@@ -577,11 +577,13 @@ void Queue::consume(Consumer::shared_ptr
     if (mgmtObject != 0 && c->isCounted()) {
         mgmtObject->inc_consumerCount();
     }
-    ManagementAgent* agent = broker->getManagementAgent();
-    if (agent) {
-        agent->raiseEvent(
-            _qmf::EventSubscribe(connectionId, userId, name,
-                                 c->getTag(), requestExclusive, ManagementAgent::toMap(arguments)));
+    if (broker) {
+        ManagementAgent* agent = broker->getManagementAgent();
+        if (agent) {
+            agent->raiseEvent(
+                _qmf::EventSubscribe(connectionId, userId, name,
+                                     c->getTag(), requestExclusive, ManagementAgent::toMap(arguments)));
+        }
     }
 }
 
@@ -589,6 +591,7 @@ void Queue::cancel(Consumer::shared_ptr 
 {
     removeListener(c);
     if(c->isCounted())
+
     {
         bool unused;
         {
@@ -605,12 +608,12 @@ void Queue::cancel(Consumer::shared_ptr 
         if (mgmtObject != 0) {
             mgmtObject->dec_consumerCount();
         }
-        if (unused && settings.autodelete) {
-            scheduleAutoDelete();
-        }
+        if (unused && settings.autodelete) scheduleAutoDelete();
+    }
+    if (broker) {
+        ManagementAgent* agent = broker->getManagementAgent();
+        if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(connectionId, userId, c->getTag()));
     }
-    ManagementAgent* agent = broker->getManagementAgent();
-    if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(connectionId, userId, c->getTag()));
 }
 
 /**

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Apr  7 21:22:55 2014
@@ -363,7 +363,7 @@ class Queue : public boost::enable_share
      * be created.
      */
     QPID_BROKER_EXTERN void markInUse(bool controlling=false);
-    QPID_BROKER_EXTERN void releaseFromUse(bool controlling=false);
+    QPID_BROKER_EXTERN void releaseFromUse(bool controlling=false, bool doDelete=true);
 
     QPID_BROKER_EXTERN uint32_t purge(const uint32_t purge_request=0,  //defaults to all messages
                    boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(),

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Apr  7 21:22:55 2014
@@ -283,8 +283,6 @@ void SessionAdapter::QueueHandlerImpl::d
         } catch (const qpid::types::Exception& e) {
             throw InvalidArgumentException(e.what());
         }
-        // Identify queues that won't survive a failover.
-        settings.isTemporary = exclusive && autoDelete && !settings.autoDeleteDelay;
 
         std::pair<Queue::shared_ptr, bool> queue_created =
             getBroker().createQueue(name, settings,

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Mon Apr  7 21:22:55 2014
@@ -264,7 +264,7 @@ Session::ResolvedNode Session::resolve(c
                     QPID_LOG_CAT(warning, model, "Node name will be ambiguous, creation of queue named " << name << " requested when exchange of the same name already exists");
                 }
                 std::pair<boost::shared_ptr<Queue>, bool> result
-                    = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId());
+                    = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), node.properties.isExclusive() ? this:0, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId());
                 node.queue = result.first;
                 node.created = result.second;
             }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Mon Apr  7 21:22:55 2014
@@ -156,7 +156,7 @@ QueueReplicator::QueueReplicator(HaBroke
     args.setString(QPID_REPLICATE, printable(NONE).str());
     setArgs(args);
     // Don't allow backup queues to auto-delete, primary decides when to delete.
-    if (q->isAutoDelete()) q->markInUse();
+    if (q->isAutoDelete()) q->markInUse(false);
 
     dispatch[DequeueEvent::KEY] =
         boost::bind(&QueueReplicator::dequeueEvent, this, _1, _2);
@@ -352,13 +352,13 @@ void QueueReplicator::promoted() {
         queue->getMessageInterceptors().add(
             boost::shared_ptr<IdSetter>(new IdSetter(maxId+1)));
         // Process auto-deletes
-        if (queue->isAutoDelete() && subscribed) {
+        if (queue->isAutoDelete()) {
             // Make a temporary shared_ptr to prevent premature deletion of queue.
             // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
             // which could delete the queue while it's still running it's destroyed logic.
             boost::shared_ptr<Queue> q(queue);
-            q->releaseFromUse();
-            q->scheduleAutoDelete();
+            // See markInUse in ctor: release but don't delete if never used.
+            q->releaseFromUse(false/*controller*/, subscribed/*doDelete*/);
         }
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h Mon Apr  7 21:22:55 2014
@@ -64,8 +64,6 @@ class ReplicationTest
 
     // Calculate level for objects that may not have replication set,
     // including auto-delete/exclusive settings.
-    ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive) const;
-    ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive) const;
     ReplicateLevel useLevel(const broker::Queue&) const;
     ReplicateLevel useLevel(const broker::Exchange&) const;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp Mon Apr  7 21:22:55 2014
@@ -119,6 +119,8 @@ void ConnectionOptions::set(const std::s
         nestAnnotations = value;
     } else if (name == "set-to-on-send" || name == "set_to_on_send") {
         setToOnSend = value;
+    } else if (name == "properties" || name == "client-properties" || name == "client_properties") {
+        properties = value.asMap();
     } else {
         throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.h?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionOptions.h Mon Apr  7 21:22:55 2014
@@ -47,6 +47,7 @@ struct ConnectionOptions : qpid::client:
     std::string identifier;
     bool nestAnnotations;
     bool setToOnSend;
+    std::map<std::string, qpid::types::Variant> properties;
 
     QPID_MESSAGING_EXTERN ConnectionOptions(const std::map<std::string, qpid::types::Variant>&);
     QPID_MESSAGING_EXTERN void set(const std::string& name, const qpid::types::Variant& value);

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Mon Apr  7 21:22:55 2014
@@ -18,6 +18,8 @@
  * under the License.
  *
  */
+
+#include "PnData.h"
 #include "qpid/messaging/amqp/AddressHelper.h"
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/AddressImpl.h"
@@ -33,6 +35,7 @@ extern "C" {
 #include <proton/engine.h>
 }
 
+
 namespace qpid {
 namespace messaging {
 namespace amqp {
@@ -239,179 +242,6 @@ bool replace(Variant::Map& map, const st
     }
 }
 
-void write(pn_data_t* data, const Variant& value);
-
-void write(pn_data_t* data, const Variant::Map& map)
-{
-    pn_data_put_map(data);
-    pn_data_enter(data);
-    for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
-        pn_data_put_string(data, convert(i->first));
-        write(data, i->second);
-    }
-    pn_data_exit(data);
-}
-void write(pn_data_t* data, const Variant::List& list)
-{
-    pn_data_put_list(data);
-    pn_data_enter(data);
-    for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
-        write(data, *i);
-    }
-    pn_data_exit(data);
-}
-void write(pn_data_t* data, const Variant& value)
-{
-    switch (value.getType()) {
-      case qpid::types::VAR_VOID:
-        pn_data_put_null(data);
-        break;
-      case qpid::types::VAR_BOOL:
-        pn_data_put_bool(data, value.asBool());
-        break;
-      case qpid::types::VAR_UINT64:
-        pn_data_put_ulong(data, value.asUint64());
-        break;
-      case qpid::types::VAR_INT64:
-        pn_data_put_long(data, value.asInt64());
-        break;
-      case qpid::types::VAR_DOUBLE:
-        pn_data_put_double(data, value.asDouble());
-        break;
-      case qpid::types::VAR_STRING:
-        pn_data_put_string(data, convert(value.asString()));
-        break;
-      case qpid::types::VAR_MAP:
-        write(data, value.asMap());
-        break;
-      case qpid::types::VAR_LIST:
-        write(data, value.asList());
-        break;
-      default:
-        break;
-    }
-}
-bool read(pn_data_t* data, pn_type_t type, qpid::types::Variant& value);
-bool read(pn_data_t* data, qpid::types::Variant& value)
-{
-    return read(data, pn_data_type(data), value);
-}
-void readList(pn_data_t* data, qpid::types::Variant::List& value)
-{
-    size_t count = pn_data_get_list(data);
-    pn_data_enter(data);
-    for (size_t i = 0; i < count && pn_data_next(data); ++i) {
-        qpid::types::Variant e;
-        if (read(data, e)) value.push_back(e);
-    }
-    pn_data_exit(data);
-}
-void readMap(pn_data_t* data, qpid::types::Variant::Map& value)
-{
-    size_t count = pn_data_get_list(data);
-    pn_data_enter(data);
-    for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) {
-        std::string key = convert(pn_data_get_symbol(data));
-        pn_data_next(data);
-        qpid::types::Variant e;
-        if (read(data, e)) value[key]= e;
-    }
-    pn_data_exit(data);
-}
-void readArray(pn_data_t* data, qpid::types::Variant::List& value)
-{
-    size_t count = pn_data_get_array(data);
-    pn_type_t type = pn_data_get_array_type(data);
-    pn_data_enter(data);
-    for (size_t i = 0; i < count && pn_data_next(data); ++i) {
-        qpid::types::Variant e;
-        if (read(data, type, e)) value.push_back(e);
-    }
-    pn_data_exit(data);
-}
-bool read(pn_data_t* data, pn_type_t type, qpid::types::Variant& value)
-{
-    switch (type) {
-      case PN_NULL:
-        if (value.getType() != qpid::types::VAR_VOID) value = qpid::types::Variant();
-        return true;
-      case PN_BOOL:
-        value = pn_data_get_bool(data);
-        return true;
-      case PN_UBYTE:
-        value = pn_data_get_ubyte(data);
-        return true;
-      case PN_BYTE:
-        value = pn_data_get_byte(data);
-        return true;
-      case PN_USHORT:
-        value = pn_data_get_ushort(data);
-        return true;
-      case PN_SHORT:
-        value = pn_data_get_short(data);
-        return true;
-      case PN_UINT:
-        value = pn_data_get_uint(data);
-        return true;
-      case PN_INT:
-        value = pn_data_get_int(data);
-        return true;
-      case PN_CHAR:
-        value = pn_data_get_char(data);
-        return true;
-      case PN_ULONG:
-        value = pn_data_get_ulong(data);
-        return true;
-      case PN_LONG:
-        value = pn_data_get_long(data);
-        return true;
-      case PN_TIMESTAMP:
-        value = pn_data_get_timestamp(data);
-        return true;
-      case PN_FLOAT:
-        value = pn_data_get_float(data);
-        return true;
-      case PN_DOUBLE:
-        value = pn_data_get_double(data);
-        return true;
-      case PN_UUID:
-        value = qpid::types::Uuid(pn_data_get_uuid(data).bytes);
-        return true;
-      case PN_BINARY:
-        value = convert(pn_data_get_binary(data));
-        value.setEncoding(qpid::types::encodings::BINARY);
-        return true;
-      case PN_STRING:
-        value = convert(pn_data_get_string(data));
-        value.setEncoding(qpid::types::encodings::UTF8);
-        return true;
-      case PN_SYMBOL:
-        value = convert(pn_data_get_string(data));
-        value.setEncoding(qpid::types::encodings::ASCII);
-        return true;
-      case PN_LIST:
-        value = qpid::types::Variant::List();
-        readList(data, value.asList());
-        return true;
-        break;
-      case PN_MAP:
-        value = qpid::types::Variant::Map();
-        readMap(data, value.asMap());
-        return true;
-      case PN_ARRAY:
-        value = qpid::types::Variant::List();
-        readArray(data, value.asList());
-        return true;
-      case PN_DESCRIBED:
-      case PN_DECIMAL32:
-      case PN_DECIMAL64:
-      case PN_DECIMAL128:
-      default:
-        return false;
-    }
-
-}
-
 const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes
 const uint32_t DEFAULT_TIMEOUT(0);
 }
@@ -680,9 +510,9 @@ void AddressHelper::checkAssertion(pn_te
                             requested.erase(j->first);
                         }
                     } else if (key == AUTO_DELETE) {
-                        read(data, v);
+                        PnData(data).read(v);
                         isAutoDeleted = v.asBool();
-                    } else if (j != requested.end() && (read(data, v) && v.asString() == j->second.asString())) {
+                    } else if (j != requested.end() && (PnData(data).read(v) && v.asString() == j->second.asString())) {
                         requested.erase(j->first);
                     }
                 }
@@ -815,7 +645,7 @@ void AddressHelper::configure(pn_link_t*
                 } else {
                     pn_data_put_ulong(filter, i->descriptorCode);
                 }
-                write(filter, i->value);
+                PnData(filter).write(i->value);
                 pn_data_exit(filter);
             }
             pn_data_exit(filter);
@@ -902,7 +732,7 @@ void AddressHelper::setNodeProperties(pn
                 putLifetimePolicy(data, toLifetimePolicy(i->second.asString()));
             } else {
                 pn_data_put_symbol(data, convert(i->first));
-                write(data, i->second);
+                PnData(data).write(i->second);
             }
         }
         pn_data_exit(data);

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Mon Apr  7 21:22:55 2014
@@ -20,6 +20,7 @@
  */
 #include "ConnectionContext.h"
 #include "DriverImpl.h"
+#include "PnData.h"
 #include "ReceiverContext.h"
 #include "Sasl.h"
 #include "SenderContext.h"
@@ -49,6 +50,8 @@ extern "C" {
 namespace qpid {
 namespace messaging {
 namespace amqp {
+using types::Variant;
+
 namespace {
 
 //remove conditional when 0.5 is no longer supported
@@ -872,13 +875,6 @@ namespace {
 const std::string CLIENT_PROCESS_NAME("qpid.client_process");
 const std::string CLIENT_PID("qpid.client_pid");
 const std::string CLIENT_PPID("qpid.client_ppid");
-pn_bytes_t convert(const std::string& s)
-{
-    pn_bytes_t result;
-    result.start = const_cast<char*>(s.data());
-    result.size = s.size();
-    return result;
-}
 }
 void ConnectionContext::setProperties()
 {
@@ -886,15 +882,21 @@ void ConnectionContext::setProperties()
     pn_data_put_map(data);
     pn_data_enter(data);
 
-    pn_data_put_symbol(data, convert(CLIENT_PROCESS_NAME));
+    pn_data_put_symbol(data, PnData::str(CLIENT_PROCESS_NAME));
     std::string processName = sys::SystemInfo::getProcessName();
-    pn_data_put_string(data, convert(processName));
+    pn_data_put_string(data, PnData::str(processName));
 
-    pn_data_put_symbol(data, convert(CLIENT_PID));
+    pn_data_put_symbol(data, PnData::str(CLIENT_PID));
     pn_data_put_int(data, sys::SystemInfo::getProcessId());
 
-    pn_data_put_symbol(data, convert(CLIENT_PPID));
+    pn_data_put_symbol(data, PnData::str(CLIENT_PPID));
     pn_data_put_int(data, sys::SystemInfo::getParentProcessId());
+
+    for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i)
+    {
+        pn_data_put_symbol(data, PnData::str(i->first));
+        PnData(data).write(i->second);
+    }
     pn_data_exit(data);
 }
 

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp?rev=1585588&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp Mon Apr  7 21:22:55 2014
@@ -0,0 +1,218 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PnData.h"
+#include "qpid/types/encodings.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+using types::Variant;
+
+void PnData::write(const Variant::Map& map)
+{
+    pn_data_put_map(data);
+    pn_data_enter(data);
+    for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+        pn_data_put_string(data, str(i->first));
+        write(i->second);
+    }
+    pn_data_exit(data);
+}
+void PnData::write(const Variant::List& list)
+{
+    pn_data_put_list(data);
+    pn_data_enter(data);
+    for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+        write(*i);
+    }
+    pn_data_exit(data);
+}
+void PnData::write(const Variant& value)
+{
+    switch (value.getType()) {
+      case qpid::types::VAR_VOID:
+        pn_data_put_null(data);
+        break;
+      case qpid::types::VAR_BOOL:
+        pn_data_put_bool(data, value.asBool());
+        break;
+      case qpid::types::VAR_UINT64:
+        pn_data_put_ulong(data, value.asUint64());
+        break;
+      case qpid::types::VAR_INT64:
+        pn_data_put_long(data, value.asInt64());
+        break;
+      case qpid::types::VAR_DOUBLE:
+        pn_data_put_double(data, value.asDouble());
+        break;
+      case qpid::types::VAR_STRING:
+        pn_data_put_string(data, str(value.asString()));
+        break;
+      case qpid::types::VAR_MAP:
+        write(value.asMap());
+        break;
+      case qpid::types::VAR_LIST:
+        write(value.asList());
+        break;
+      default:
+        break;
+    }
+}
+
+bool PnData::read(qpid::types::Variant& value)
+{
+    return read(pn_data_type(data), value);
+}
+
+void PnData::readList(qpid::types::Variant::List& value)
+{
+    size_t count = pn_data_get_list(data);
+    pn_data_enter(data);
+    for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+        qpid::types::Variant e;
+        if (read(e)) value.push_back(e);
+    }
+    pn_data_exit(data);
+}
+
+void PnData::readMap(qpid::types::Variant::Map& value)
+{
+    size_t count = pn_data_get_list(data);
+    pn_data_enter(data);
+    for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) {
+        std::string key = str(pn_data_get_symbol(data));
+        pn_data_next(data);
+        qpid::types::Variant e;
+        if (read(e)) value[key]= e;
+    }
+    pn_data_exit(data);
+}
+
+void PnData::readArray(qpid::types::Variant::List& value)
+{
+    size_t count = pn_data_get_array(data);
+    pn_type_t type = pn_data_get_array_type(data);
+    pn_data_enter(data);
+    for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+        qpid::types::Variant e;
+        if (read(type, e)) value.push_back(e);
+    }
+    pn_data_exit(data);
+}
+
+bool PnData::read(pn_type_t type, qpid::types::Variant& value)
+{
+    switch (type) {
+      case PN_NULL:
+        if (value.getType() != qpid::types::VAR_VOID) value = qpid::types::Variant();
+        return true;
+      case PN_BOOL:
+        value = pn_data_get_bool(data);
+        return true;
+      case PN_UBYTE:
+        value = pn_data_get_ubyte(data);
+        return true;
+      case PN_BYTE:
+        value = pn_data_get_byte(data);
+        return true;
+      case PN_USHORT:
+        value = pn_data_get_ushort(data);
+        return true;
+      case PN_SHORT:
+        value = pn_data_get_short(data);
+        return true;
+      case PN_UINT:
+        value = pn_data_get_uint(data);
+        return true;
+      case PN_INT:
+        value = pn_data_get_int(data);
+        return true;
+      case PN_CHAR:
+        value = pn_data_get_char(data);
+        return true;
+      case PN_ULONG:
+        value = pn_data_get_ulong(data);
+        return true;
+      case PN_LONG:
+        value = pn_data_get_long(data);
+        return true;
+      case PN_TIMESTAMP:
+        value = pn_data_get_timestamp(data);
+        return true;
+      case PN_FLOAT:
+        value = pn_data_get_float(data);
+        return true;
+      case PN_DOUBLE:
+        value = pn_data_get_double(data);
+        return true;
+      case PN_UUID:
+        value = qpid::types::Uuid(pn_data_get_uuid(data).bytes);
+        return true;
+      case PN_BINARY:
+        value = str(pn_data_get_binary(data));
+        value.setEncoding(qpid::types::encodings::BINARY);
+        return true;
+      case PN_STRING:
+        value = str(pn_data_get_string(data));
+        value.setEncoding(qpid::types::encodings::UTF8);
+        return true;
+      case PN_SYMBOL:
+        value = str(pn_data_get_string(data));
+        value.setEncoding(qpid::types::encodings::ASCII);
+        return true;
+      case PN_LIST:
+        value = qpid::types::Variant::List();
+        readList(value.asList());
+        return true;
+        break;
+      case PN_MAP:
+        value = qpid::types::Variant::Map();
+        readMap(value.asMap());
+        return true;
+      case PN_ARRAY:
+        value = qpid::types::Variant::List();
+        readArray(value.asList());
+        return true;
+      case PN_DESCRIBED:
+      case PN_DECIMAL32:
+      case PN_DECIMAL64:
+      case PN_DECIMAL128:
+      default:
+        return false;
+    }
+
+}
+
+pn_bytes_t PnData::str(const std::string& s)
+{
+    pn_bytes_t result;
+    result.start = const_cast<char*>(s.data());
+    result.size = s.size();
+    return result;
+}
+
+std::string PnData::str(const pn_bytes_t& in)
+{
+    return std::string(in.start, in.size);
+}
+
+}}} // namespace qpid::messaging::amqp

Propchange: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.h?rev=1585588&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.h Mon Apr  7 21:22:55 2014
@@ -0,0 +1,60 @@
+#ifndef QPID_MESSAGING_AMQP_PNDATA_H
+#define QPID_MESSAGING_AMQP_PNDATA_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/types/Variant.h"
+extern "C" {
+#include <proton/engine.h>
+}
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+/**
+ *  Helper class to read/write messaging types to/from pn_data_t.
+ */
+class PnData
+{
+  public:
+    PnData(pn_data_t* d) : data(d) {}
+
+    void write(const types::Variant& value);
+    void write(const types::Variant::Map& map);
+    void write(const types::Variant::List& list);
+
+    bool read(pn_type_t type, types::Variant& value);
+    bool read(types::Variant& value);
+    void readList(types::Variant::List& value);
+    void readMap(types::Variant::Map& value);
+    void readArray(types::Variant::List& value);
+
+    static pn_bytes_t str(const std::string&);
+    static std::string str(const pn_bytes_t&);
+
+  private:
+    pn_data_t* data;
+};
+}}} // namespace messaging::amqp
+
+#endif  /*!QPID_MESSAGING_AMQP_PNDATA_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/PnData.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Mon Apr  7 21:22:55 2014
@@ -21,19 +21,47 @@
 
 import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re
 import qpid, traceback, signal
-from qpid import connection, messaging, util
+from qpid import connection, util
 from qpid.compat import format_exc
 from qpid.harness import Skipped
 from unittest import TestCase
 from copy import copy
 from threading import Thread, Lock, Condition
 from logging import getLogger
+from qpidtoollibs import BrokerAgent
 
-try: import qmf.console
-except: print "Cannot import module qmf.console, skipping tests"; exit(0);
-
+# NOTE: Always import native client qpid.messaging, import swigged client
+# qpid_messaging if possible. qpid_messaing is set to None if not available.
+#
+# qm is set to qpid_messaging if it is available, qpid.messaging if not.
+# Use qm.X to specify names from the default messaging module.
+#
+# Set environment variable QPID_PY_NO_SWIG=1 to prevent qpid_messaging from loading.
+#
+# BrokerTest can be configured to determine which protocol is used by default:
+#
+# -DPROTOCOL="amqpX": Use protocol "amqpX". Defaults to amqp1.0 if swig client
+#  is being used, amqp0-10 if native client is being used.
+#
+# The configured defaults can be over-ridden on BrokerTest.connect and some
+# other methods by specifying native=True|False and protocol="amqpX"
+#
+
+import qpid.messaging
+qm = qpid.messaging
+qpid_messaging = None
+if not os.environ.get("QPID_PY_NO_SWIG"):
+    try:
+        import qpid_messaging
+        from qpid.datatypes import uuid4
+        qm = qpid_messaging
+        # Silence warnings from swigged messaging library unless enabled in environment.
+        if "QPID_LOG_ENABLE" not in os.environ and "QPID_TRACE" not in os.environ:
+            qm.Logger.configure(["--log-enable=error"])
+    except ImportError:
+        print "Cannot load python SWIG bindings, falling back to native qpid.messaging."
 
-log = getLogger("qpid.brokertest")
+log = getLogger("brokertest")
 
 # Values for expected outcome of process at end of test
 EXPECT_EXIT_OK=1           # Expect to exit with 0 status before end of test.
@@ -149,7 +177,7 @@ class Popen(subprocess.Popen):
         err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
         raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
 
-    def stop(self):                  # Clean up at end of test.
+    def teardown(self):         # Clean up at end of test.
         try:
             if self.expect == EXPECT_UNKNOWN:
                 try: self.kill()            # Just make sure its dead
@@ -253,14 +281,16 @@ class Broker(Popen):
 
         self.test = test
         self._port=port
+        args = copy(args)
+        if BrokerTest.amqp_lib: args += ["--load-module", BrokerTest.amqp_lib]
         if BrokerTest.store_lib and not test_store:
-            args = args + ['--load-module', BrokerTest.store_lib]
+            args += ['--load-module', BrokerTest.store_lib]
             if BrokerTest.sql_store_lib:
-                args = args + ['--load-module', BrokerTest.sql_store_lib]
-                args = args + ['--catalog', BrokerTest.sql_catalog]
+                args += ['--load-module', BrokerTest.sql_store_lib]
+                args += ['--catalog', BrokerTest.sql_catalog]
             if BrokerTest.sql_clfs_store_lib:
-                args = args + ['--load-module', BrokerTest.sql_clfs_store_lib]
-                args = args + ['--catalog', BrokerTest.sql_catalog]
+                args += ['--load-module', BrokerTest.sql_clfs_store_lib]
+                args += ['--catalog', BrokerTest.sql_catalog]
         cmd = [BrokerTest.qpidd_exec, "--port", port, "--interface", "127.0.0.1", "--no-module-dir"] + args
         if not "--auth" in args: cmd.append("--auth=no")
         if wait != None:
@@ -288,13 +318,11 @@ class Broker(Popen):
         cmd += ["--data-dir", self.datadir]
         if show_cmd: print cmd
         Popen.__init__(self, cmd, expect, stdout=PIPE)
-        test.cleanup_stop(self)
+        test.teardown_add(self)
         self._host = "127.0.0.1"
-        log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
+        self._agent = None
 
-    def startQmf(self, handler=None):
-        self.qmf_session = qmf.console.Session(handler)
-        self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port()))
+        log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
 
     def host(self): return self._host
 
@@ -310,22 +338,25 @@ class Broker(Popen):
     def unexpected(self,msg):
         raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname))
 
-    def connect(self, timeout=5, **kwargs):
-        """New API connection to the broker."""
-        return messaging.Connection.establish(self.host_port(), timeout=timeout, **kwargs)
-
-    def connect_old(self):
-        """Old API connection to the broker."""
-        socket = qpid.util.connect(self.host(),self.port())
-        connection = qpid.connection.Connection (sock=socket)
-        connection.start()
-        return connection;
+    def connect(self, timeout=5, native=False, **kwargs):
+        """New API connection to the broker.
+        @param native if True force use of the native qpid.messaging client
+        even if swig client is available.
+        """
+        if self.test.protocol: kwargs.setdefault("protocol", self.test.protocol)
+        if native: connection_class = qpid.messaging.Connection
+        else: connection_class = qm.Connection
+        return connection_class.establish(self.host_port(), timeout=timeout, **kwargs)
+
+    @property
+    def agent(self, **kwargs):
+        """Return a BrokerAgent for this broker"""
+        if not self._agent: self._agent = BrokerAgent(self.connect(**kwargs))
+        return self._agent
+
 
     def declare_queue(self, queue):
-        c = self.connect_old()
-        s = c.session(str(qpid.datatypes.uuid4()))
-        s.queue_declare(queue=queue)
-        c.close()
+        self.agent.addQueue(queue)
 
     def _prep_sender(self, queue, durable, xprops):
         s = queue + "; {create:always, node:{durable:" + str(durable)
@@ -402,7 +433,7 @@ def browse(session, queue, timeout=0, tr
         contents = []
         try:
             while True: contents.append(transform(r.fetch(timeout=timeout)))
-        except messaging.Empty: pass
+        except qm.Empty: pass
     finally: r.close()
     return contents
 
@@ -451,30 +482,42 @@ class BrokerTest(TestCase):
     def configure(self, config): self.config=config
 
     def setUp(self):
-        outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp"
+        defs = self.config.defines
+        outdir = defs.get("OUTDIR") or "brokertest.tmp"
         self.dir = os.path.join(self.rootdir, outdir, self.id())
         os.makedirs(self.dir)
         os.chdir(self.dir)
-        self.stopem = []                # things to stop at end of test
+        self.teardown_list = []                # things to tear down at end of test
+
+        self.protocol = defs.get("PROTOCOL") or ("amqp1.0" if qpid_messaging else "amqp0-10")
+        self.tx_protocol = "amqp0-10" # Transactions not yet supported over 1.0
+
 
     def tearDown(self):
         err = []
-        for p in self.stopem:
-            try: p.stop()
-            except Exception, e: err.append(str(e))
-        self.stopem = []                # reset in case more processes start
+        self.teardown_list.reverse() # Tear down in reverse order
+        for p in self.teardown_list:
+            log.debug("Tearing down %s", p)
+            try:
+                # Call the first of the methods that is available on p.
+                for m in ["teardown", "close"]:
+                    a = getattr(p, m, None)
+                    if a: a(); break
+                else: raise Exception("Don't know how to tear down %s", p)
+            except Exception, e: err.append("%s: %s"%(e.__class__.__name__, str(e)))
+        self.teardown_list = []                # reset in case more processes start
         os.chdir(self.rootdir)
         if err: raise Exception("Unexpected process status:\n    "+"\n    ".join(err))
 
-    def cleanup_stop(self, stopable):
-        """Call thing.stop at end of test"""
-        self.stopem.append(stopable)
+    def teardown_add(self, thing):
+        """Call thing.teardown() or thing.close() at end of test"""
+        self.teardown_list.append(thing)
 
     def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
         """Start a process that will be killed at end of test, in the test dir."""
         os.chdir(self.dir)
         p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr)
-        self.cleanup_stop(p)
+        self.teardown_add(p)
         return p
 
     def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False):
@@ -490,6 +533,11 @@ class BrokerTest(TestCase):
     def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
     def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)
 
+    def protocol_option(self, connection_options=""):
+        if "protocol" in connection_options: return connection_options
+        else: return ",".join(filter(None, [connection_options,"protocol:'%s'"%self.protocol]))
+
+
 def join(thread, timeout=30):
     thread.join(timeout)
     if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
@@ -524,7 +572,7 @@ class NumberedSender(Thread):
 
     def __init__(self, broker, max_depth=None, queue="test-queue",
                  connection_options=RECONNECT_OPTIONS,
-                 failover_updates=True, url=None, args=[]):
+                 failover_updates=False, url=None, args=[]):
         """
         max_depth: enable flow control, ensure sent - received <= max_depth.
         Requires self.notify_received(n) to be called each time messages are received.
@@ -533,7 +581,7 @@ class NumberedSender(Thread):
         cmd = ["qpid-send",
                "--broker", url or broker.host_port(),
                "--address", "%s;{create:always}"%queue,
-               "--connection-options", "{%s}"%(connection_options),
+               "--connection-options", "{%s}"%(broker.test.protocol_option(connection_options)),
                "--content-stdin"
                ] + args
         if failover_updates: cmd += ["--failover-updates"]
@@ -592,7 +640,7 @@ class NumberedReceiver(Thread):
     """
     def __init__(self, broker, sender=None, queue="test-queue",
                  connection_options=RECONNECT_OPTIONS,
-                 failover_updates=True, url=None, args=[]):
+                 failover_updates=False, url=None, args=[]):
         """
         sender: enable flow control. Call sender.received(n) for each message received.
         """
@@ -601,7 +649,7 @@ class NumberedReceiver(Thread):
         cmd = ["qpid-receive",
                "--broker", url or broker.host_port(),
                "--address", "%s;{create:always}"%queue,
-               "--connection-options", "{%s}"%(connection_options),
+               "--connection-options", "{%s}"%(broker.test.protocol_option(connection_options)),
                "--forever"
                ]
         if failover_updates: cmd += [ "--failover-updates" ]

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1585588&r1=1585587&r2=1585588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Mon Apr  7 21:22:55 2014
@@ -20,8 +20,6 @@
 
 import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
 import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
-from qpid.datatypes import uuid4, UUID
 from brokertest import *
 from threading import Thread, Lock, Condition
 from logging import getLogger, WARN, ERROR, DEBUG, INFO
@@ -44,7 +42,7 @@ class LogLevel:
 class QmfAgent(object):
     """Access to a QMF broker agent."""
     def __init__(self, address, **kwargs):
-        self._connection = Connection.establish(
+        self._connection = qm.Connection.establish(
             address, client_properties={"qpid.ha-admin":1}, **kwargs)
         self._agent = BrokerAgent(self._connection)
 
@@ -105,9 +103,9 @@ class HaPort:
         self.port = self.socket.getsockname()[1]
         self.fileno = self.socket.fileno()
         self.stopped = False
-        test.cleanup_stop(self) # Stop during test.tearDown
+        test.teardown_add(self) # Stop during test.tearDown
 
-    def stop(self):             # Called in tearDown
+    def teardown(self):             # Called in tearDown
         if not self.stopped:
             self.stopped = True
             self.socket.shutdown(socket.SHUT_RDWR)
@@ -180,6 +178,7 @@ acl allow all all
     def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url])
     def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]);
     def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
+    @property
     def agent(self):
         if not self._agent:
             cred = self.client_credentials
@@ -190,7 +189,7 @@ acl allow all all
         return self._agent
 
     def qmf(self):
-        hb = self.agent().getHaBroker()
+        hb = self.agent.getHaBroker()
         hb.update()
         return hb
 
@@ -203,19 +202,19 @@ acl allow all all
             try:
                 self._status = self.ha_status()
                 return self._status == status;
-            except ConnectionError: return False
+            except qm.ConnectionError: return False
         assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%(
             self, status, self._status)
 
-    def wait_queue(self, queue, timeout=1):
+    def wait_queue(self, queue, timeout=1, msg="wait_queue"):
         """ Wait for queue to be visible via QMF"""
-        agent = self.agent()
-        assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout)
+        agent = self.agent
+        assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), msg+"queue %s not present"%queue
 
-    def wait_no_queue(self, queue, timeout=1):
+    def wait_no_queue(self, queue, timeout=1, msg="wait_no_queue"):
         """ Wait for queue to be invisible via QMF"""
-        agent = self.agent()
-        assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout)
+        agent = self.agent
+        assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout), "%s: queue %s still present"%(msg,queue)
 
     # TODO aconway 2012-05-01: do direct python call to qpid-config code.
     def qpid_config(self, args):
@@ -273,12 +272,12 @@ acl allow all all
     def assert_connect_fail(self):
         try:
             self.connect()
-            self.test.fail("Expected ConnectionError")
-        except ConnectionError: pass
+            self.test.fail("Expected qm.ConnectionError")
+        except qm.ConnectionError: pass
 
     def try_connect(self):
         try: return self.connect()
-        except ConnectionError: return None
+        except qm.ConnectionError: return None
 
     def ready(self, *args, **kwargs):
         if not 'client_properties' in kwargs: kwargs['client_properties'] = {}
@@ -286,7 +285,7 @@ acl allow all all
         return Broker.ready(self, *args, **kwargs)
 
     def kill(self, final=True):
-        if final: self.ha_port.stop()
+        if final: self.ha_port.teardown()
         self._agent = None
         return Broker.kill(self)
 
@@ -355,9 +354,9 @@ class HaCluster(object):
             b.set_brokers_url(self.url)
             b.set_public_url(self.url)
 
-    def connect(self, i):
+    def connect(self, i, **kwargs):
         """Connect with reconnect_urls"""
-        return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
+        return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs)
 
     def kill(self, i, promote_next=True, final=True):
         """Kill broker i, promote broker i+1"""
@@ -393,7 +392,7 @@ def wait_address(session, address):
     """Wait for an address to become valid."""
     def check():
         try: session.sender(address); return True
-        except NotFound: return False
+        except  qm.NotFound: return False
     assert retry(check), "Timed out waiting for address %s"%(address)
 
 def valid_address(session, address):
@@ -401,6 +400,6 @@ def valid_address(session, address):
     try:
         session.receiver(address)
         return True
-    except NotFound: return False
+    except qm.NotFound: return False
 
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org