You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC

svn commit: r1368910 [27/27] - in /qpid/branches/asyncstore: ./ bin/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/ cpp/bindings/qpid/ruby/features/step_definitions/ cpp...

Modified: qpid/branches/asyncstore/python/qpid/delegates.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/python/qpid/delegates.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/python/qpid/delegates.py (original)
+++ qpid/branches/asyncstore/python/qpid/delegates.py Fri Aug  3 12:13:32 2012
@@ -24,13 +24,7 @@ from exceptions import VersionError, Clo
 from logging import getLogger
 from ops import Control
 import sys
-
-_have_sasl = None
-try:
-  import saslwrapper
-  _have_sasl = True
-except:
-  pass
+from qpid import sasl
 
 log = getLogger("qpid.io.ctl")
 
@@ -172,20 +166,19 @@ class Client(Delegate):
     self.username  = username
     self.password  = password
 
-    if _have_sasl:
-      self.sasl = saslwrapper.Client()
-      if username and len(username) > 0:
-        self.sasl.setAttr("username", str(username))
-      if password and len(password) > 0:
-        self.sasl.setAttr("password", str(password))
-      self.sasl.setAttr("service", str(kwargs.get("service", "qpidd")))
-      if "host" in kwargs:
-        self.sasl.setAttr("host", str(kwargs["host"]))
-      if "min_ssf" in kwargs:
-        self.sasl.setAttr("minssf", kwargs["min_ssf"])
-      if "max_ssf" in kwargs:
-        self.sasl.setAttr("maxssf", kwargs["max_ssf"])
-      self.sasl.init()
+    self.sasl = sasl.Client()
+    if username and len(username) > 0:
+      self.sasl.setAttr("username", str(username))
+    if password and len(password) > 0:
+      self.sasl.setAttr("password", str(password))
+    self.sasl.setAttr("service", str(kwargs.get("service", "qpidd")))
+    if "host" in kwargs:
+      self.sasl.setAttr("host", str(kwargs["host"]))
+    if "min_ssf" in kwargs:
+      self.sasl.setAttr("minssf", kwargs["min_ssf"])
+    if "max_ssf" in kwargs:
+      self.sasl.setAttr("maxssf", kwargs["max_ssf"])
+    self.sasl.init()
 
   def start(self):
     # XXX
@@ -204,39 +197,29 @@ class Client(Delegate):
         mech_list += str(mech) + " "
     mech = None
     initial = None
-    if _have_sasl:
-      status, mech, initial = self.sasl.start(mech_list)
-      if status == False:
-        raise Closed("SASL error: %s" % self.sasl.getError())
-    else:
-      if self.username and self.password and ("PLAIN" in mech_list):
-        mech = "PLAIN"
-        initial = "\0%s\0%s" % (self.username, self.password)
-      else:
-        mech = "ANONYMOUS"
-        if not mech in mech_list:
-          raise Closed("No acceptable SASL authentication mechanism available")
+    try:
+      mech, initial = self.sasl.start(mech_list)
+    except Exception, e:
+      raise Closed(str(e))
     ch.connection_start_ok(client_properties=self.client_properties,
                            mechanism=mech, response=initial)
 
   def connection_secure(self, ch, secure):
     resp = None
-    if _have_sasl:
-      status, resp = self.sasl.step(secure.challenge)
-      if status == False:
-        raise Closed("SASL error: %s" % self.sasl.getError())
+    try:
+      resp = self.sasl.step(secure.challenge)
+    except Exception, e:
+      raise Closed(str(e))
     ch.connection_secure_ok(response=resp)
 
   def connection_tune(self, ch, tune):
     ch.connection_tune_ok(heartbeat=self.heartbeat)
     ch.connection_open()
-    if _have_sasl:
-      self.connection.user_id = self.sasl.getUserId()
-      self.connection.security_layer_tx = self.sasl
+    self.connection.user_id = self.sasl.auth_username()
+    self.connection.security_layer_tx = self.sasl
 
   def connection_open_ok(self, ch, open_ok):
-    if _have_sasl:
-      self.connection.security_layer_rx = self.sasl
+    self.connection.security_layer_rx = self.sasl
     self.connection.opened = True
     notify(self.connection.condition)
 

Modified: qpid/branches/asyncstore/python/qpid/framer.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/python/qpid/framer.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/python/qpid/framer.py (original)
+++ qpid/branches/asyncstore/python/qpid/framer.py Fri Aug  3 12:13:32 2012
@@ -51,9 +51,10 @@ class Framer(Packer):
     self.sock_lock.acquire()
     try:
       if self.security_layer_tx:
-        status, cipher_buf = self.security_layer_tx.encode(self.tx_buf)
-        if status == False:
-          raise Closed(self.security_layer_tx.getError())
+        try:
+          cipher_buf = self.security_layer_tx.encode(self.tx_buf)
+        except SASLError, e:
+          raise Closed(str(e))
         self._write(cipher_buf)
       else:
         self._write(self.tx_buf)
@@ -91,9 +92,10 @@ class Framer(Packer):
       try:
         s = self.sock.recv(n) # NOTE: instead of "n", arg should be "self.maxbufsize"
         if self.security_layer_rx:
-          status, s = self.security_layer_rx.decode(s)
-          if status == False:
-            raise Closed(self.security_layer_tx.getError())
+          try:
+            s = self.security_layer_rx.decode(s)
+          except SASLError, e:
+            raise Closed(str(e))
       except socket.timeout:
         if self.aborted():
           raise Closed()

Modified: qpid/branches/asyncstore/python/qpid/messaging/util.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/python/qpid/messaging/util.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/python/qpid/messaging/util.py (original)
+++ qpid/branches/asyncstore/python/qpid/messaging/util.py Fri Aug  3 12:13:32 2012
@@ -50,10 +50,13 @@ def set_reconnect_urls(conn, msg):
   reconnect_urls = []
   urls = msg.properties["amq.failover"]
   for u in urls:
+    # FIXME aconway 2012-06-12: Nasty hack parsing of the C++ broker's URL format.
     if u.startswith("amqp:"):
-      for p in u[5:].split(","):
-        parts = p.split(":")
-        host, port = parts[1:3]
+      for a in u[5:].split(","):
+        parts = a.split(":")
+        # Handle IPv6 addresses which have : in the host part.
+        port = parts[-1]        # Last : separated field is port
+        host = ":".join(parts[1:-1]) # First : separated field is protocol, host is the rest.
         reconnect_urls.append("%s:%s" % (host, port))
   conn.reconnect_urls = reconnect_urls
   log.warn("set reconnect_urls for conn %s: %s", conn, reconnect_urls)

Modified: qpid/branches/asyncstore/python/qpid/sasl.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/python/qpid/sasl.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/python/qpid/sasl.py (original)
+++ qpid/branches/asyncstore/python/qpid/sasl.py Fri Aug  3 12:13:32 2012
@@ -29,6 +29,9 @@ class WrapperClient:
 
   def setAttr(self, name, value):
     status = self._cli.setAttr(str(name), str(value))
+    if status and name == 'username':
+      status = self._cli.setAttr('externaluser', str(value))
+      
     if not status:
       raise SASLError(self._cli.getError())
 

Modified: qpid/branches/asyncstore/python/qpid/selector.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/python/qpid/selector.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/python/qpid/selector.py (original)
+++ qpid/branches/asyncstore/python/qpid/selector.py Fri Aug  3 12:13:32 2012
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-import atexit, time
+import atexit, time, errno
 from compat import select, set, selectable_waiter
 from threading import Thread, Lock
 
@@ -111,12 +111,24 @@ class Selector:
           else:
             wakeup = min(wakeup, t)
 
-      if wakeup is None:
-        timeout = None
-      else:
-        timeout = max(0, wakeup - time.time())
+      rd = []
+      wr = []
+      ex = []
 
-      rd, wr, ex = select(self.reading, self.writing, (), timeout)
+      while True:
+        try:
+          if wakeup is None:
+            timeout = None
+          else:
+            timeout = max(0, wakeup - time.time())
+          rd, wr, ex = select(self.reading, self.writing, (), timeout)
+          break
+        except Exception, (err, strerror):
+          # Repeat the select call if we were interrupted.
+          if err == errno.EINTR:
+            continue
+          else:
+            raise
 
       for sel in wr:
         if sel.writing():

Modified: qpid/branches/asyncstore/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/python/qpid/tests/messaging/endpoints.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/branches/asyncstore/python/qpid/tests/messaging/endpoints.py Fri Aug  3 12:13:32 2012
@@ -1333,3 +1333,15 @@ class SenderTests(Base):
     self.drain(self.rcv, expected=msgs)
     self.ssn.acknowledge()
     assert caught, "did not exceed capacity"
+
+  def testEINTR(self):
+    m1 = self.content("testEINTR", 0)
+    m2 = self.content("testEINTR", 1)
+
+    self.snd.send(m1, timeout=self.timeout())
+    try:
+      os.setuid(500)
+      assert False, "setuid should fail"
+    except:
+      pass
+    self.snd.send(m2, timeout=self.timeout())

Modified: qpid/branches/asyncstore/python/qpid/util.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/python/qpid/util.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/python/qpid/util.py (original)
+++ qpid/branches/asyncstore/python/qpid/util.py Fri Aug  3 12:13:32 2012
@@ -25,9 +25,9 @@ except ImportError:
   from socket import ssl as wrap_socket
   class ssl:
 
-    def __init__(self, sock):
+    def __init__(self, sock, keyfile=None, certfile=None, trustfile=None):
       self.sock = sock
-      self.ssl = wrap_socket(sock)
+      self.ssl = wrap_socket(sock, keyfile=keyfile, certfile=certfile, ca_certs=trustfile)
 
     def recv(self, n):
       return self.ssl.read(n)

Modified: qpid/branches/asyncstore/python/setup.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/python/setup.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/python/setup.py (original)
+++ qpid/branches/asyncstore/python/setup.py Fri Aug  3 12:13:32 2012
@@ -298,7 +298,7 @@ class install_lib(_install_lib):
     return outfiles + extra
 
 setup(name="qpid-python",
-      version="0.17",
+      version="0.19",
       author="Apache Qpid",
       author_email="dev@qpid.apache.org",
       packages=["mllib", "qpid", "qpid.messaging", "qpid.tests",

Propchange: qpid/branches/asyncstore/specs/amqp.xsl
------------------------------------------------------------------------------
    svn:mime-type = application/xslt+xml

Modified: qpid/branches/asyncstore/specs/apache-filters.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/specs/apache-filters.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/specs/apache-filters.xml (original)
+++ qpid/branches/asyncstore/specs/apache-filters.xml Fri Aug  3 12:13:32 2012
@@ -17,7 +17,6 @@
    specific language governing permissions and limitations
    under the License.
 -->
-<!DOCTYPE amqp SYSTEM "amqp.dtd">
 <?xml-stylesheet type="text/xsl" href="amqp.xsl"?>
 
 <amqp name="apache-filters" label="Apache Proposed AMQP 1-0 Filters">
@@ -175,9 +174,9 @@ JMSType           | annotation jms-type 
           where <i>field_name</i> is the appropriate AMQP 1.0 field
           named in the table above, with the hyphen replaced by an
           underscore. For example, the selector: "JMSCorrelationID =
-          ’abc’ AND color = ’blue’ AND weight > 2500" would be
-          transferred over the wire as: "amqp.correlation_id = ’abc’
-          AND color = ’blue’ AND weight > 2500"
+          'abc' AND color = 'blue' AND weight > 2500" would be
+          transferred over the wire as: "amqp.correlation_id = 'abc'
+          AND color = 'blue' AND weight > 2500"
         </p>
         <p>
           The "properties" of the JMS message are equivalent to the AMQP application-properties

Propchange: qpid/branches/asyncstore/specs/apache-filters.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Modified: qpid/branches/asyncstore/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/specs/management-schema.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/specs/management-schema.xml (original)
+++ qpid/branches/asyncstore/specs/management-schema.xml Fri Aug  3 12:13:32 2012
@@ -8,9 +8,9 @@
   to you under the Apache License, Version 2.0 (the
   "License"); you may not use this file except in compliance
   with the License.  You may obtain a copy of the License at
-  
+
     http://www.apache.org/licenses/LICENSE-2.0
-  
+
   Unless required by applicable law or agreed to in writing,
   software distributed under the License is distributed on an
   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -81,7 +81,6 @@
     <property name="systemRef"        type="objId"  references="System" access="RO" desc="System ID" parentRef="y"/>
     <property name="port"             type="uint16" access="RO" desc="TCP Port for AMQP Service"/>
     <property name="workerThreads"    type="uint16" access="RO" desc="Thread pool size"/>
-    <property name="maxConns"         type="uint16" access="RO" desc="Maximum allowed connections"/>
     <property name="connBacklog"      type="uint16" access="RO" desc="Connection backlog limit for listening socket"/>
     <property name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/>
     <property name="mgmtPublish"      type="bool"   access="RO" desc="Broker's management agent sends unsolicited data on the publish interval"/>
@@ -164,20 +163,20 @@
 
     <method name="create" desc="Create an object of the specified type">
       <arg name="type" dir="I" type="sstr" desc="The type of object to create"/>
-      <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/> 
-      <arg name="properties" dir="I" type="map" desc="Type specific object properties"/> 
-      <arg name="strict" dir="I" type="bool" desc="If specified, treat unrecognised object properties as an error"/> 
+      <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/>
+      <arg name="properties" dir="I" type="map" desc="Type specific object properties"/>
+      <arg name="strict" dir="I" type="bool" desc="If specified, treat unrecognised object properties as an error"/>
     </method>
 
     <method name="delete" desc="Delete an object of the specified type">
       <arg name="type" dir="I" type="sstr" desc="The type of object to delete"/>
-      <arg name="name" dir="I" type="sstr" desc="The name of the object to delete"/> 
-      <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/> 
+      <arg name="name" dir="I" type="sstr" desc="The name of the object to delete"/>
+      <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/>
     </method>
 
     <method name="query" desc="Query the current state of an object.">
       <arg name="type" dir="I" type="sstr" desc="The type of object to query."/>
-      <arg name="name" dir="I" type="sstr" desc="The name of the object to query"/> 
+      <arg name="name" dir="I" type="sstr" desc="The name of the object to query"/>
       <arg name="results" dir="O" type="map"  desc="A snapshot of the object's state."/>
     </method>
 
@@ -220,7 +219,7 @@
 
     <property name="durable"     type="bool"  access="RC"/>
     <property name="autoDelete"  type="bool"  access="RC"/>
-    <property name="exclusive"   type="bool"  access="RC"/>
+    <property name="exclusive"   type="bool"  access="RO"/>
     <property name="arguments"   type="map"   access="RO" desc="Arguments supplied in queue.declare"/>
     <property name="altExchange" type="objId" references="Exchange" access="RO" optional="y"/>
 
@@ -321,7 +320,7 @@
 
     <statistic name="msgMatched" type="count64"/>
   </class>
-  
+
   <!--
   ===============================================================
   Subscription
@@ -338,7 +337,7 @@
     <property name="arguments"      type="map"      access="RC"/>
     <statistic name="delivered"     type="count64"  unit="message" desc="Messages delivered"/>
   </class>
-  
+
   <!--
   ===============================================================
   Connection
@@ -366,7 +365,7 @@
     <statistic name="msgsFromClient"  type="count64"/>
     <statistic name="msgsToClient"    type="count64"/>
 
-    <method name="close"/> 
+    <method name="close"/>
   </class>
 
   <!--
@@ -379,15 +378,17 @@
     This class represents an inter-broker connection.
 
     <property name="vhostRef"  type="objId"  references="Vhost" access="RC" index="y" parentRef="y"/>
-    <property name="host"      type="sstr"   access="RC" index="y"/>
-    <property name="port"      type="uint16" access="RC" index="y"/>
-    <property name="transport" type="sstr"   access="RC"/>
+    <property name="name"      type="sstr"   access="RC" index="y"/>
+    <property name="host"      type="sstr"   access="RO"/>
+    <property name="port"      type="uint16" access="RO"/>
+    <property name="transport" type="sstr"   access="RO"/>
     <property name="durable"   type="bool"   access="RC"/>
+    <property name="connectionRef" type="objId" references="Connection" access="RO"/>
 
     <statistic name="state"       type="sstr" desc="Operational state of the link"/>
     <statistic name="lastError"   type="lstr" desc="Reason link is not operational"/>
 
-    <method name="close"/> 
+    <method name="close"/>
 
     <method name="bridge" desc="Bridge messages over the link">
       <arg name="durable"     dir="I" type="bool"/>
@@ -411,7 +412,8 @@
   -->
   <class name="Bridge">
     <property name="linkRef"     type="objId"  references="Link" access="RC" index="y" parentRef="y"/>
-    <property name="channelId"   type="uint16" access="RC" index="y"/>
+    <property name="name"        type="sstr"   access="RC"  index="y"/>
+    <property name="channelId"   type="uint16" access="RO"/>
     <property name="durable"     type="bool"   access="RC"/>
     <property name="src"         type="sstr"   access="RC"/>
     <property name="dest"        type="sstr"   access="RC"/>
@@ -422,7 +424,7 @@
     <property name="excludes"    type="sstr"   access="RC"/>
     <property name="dynamic"     type="bool"   access="RC"/>
     <property name="sync"        type="uint16" access="RC"/>
-    <method name="close"/> 
+    <method name="close"/>
   </class>
 
 
@@ -441,7 +443,7 @@
     <property name="expireTime"       type="absTime" access="RO" optional="y"/>
     <property name="maxClientRate"    type="uint32"  access="RO" unit="msgs/sec" optional="y"/>
 
-    <statistic name="framesOutstanding" type="count32"/>
+    <statistic name="unackedMessages" type="uint64" unit="message" desc="Unacknowledged messages in the session"/>
 
     <statistic name="TxnStarts"    type="count64"  unit="transaction" desc="Total transactions started "/>
     <statistic name="TxnCommits"   type="count64"  unit="transaction" desc="Total transactions committed"/>

Modified: qpid/branches/asyncstore/tests/setup.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tests/setup.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tests/setup.py (original)
+++ qpid/branches/asyncstore/tests/setup.py Fri Aug  3 12:13:32 2012
@@ -20,7 +20,7 @@
 from distutils.core import setup
 
 setup(name="qpid-tests",
-      version="0.17",
+      version="0.19",
       author="Apache Qpid",
       author_email="dev@qpid.apache.org",
       packages=["qpid_tests", "qpid_tests.broker_0_10", "qpid_tests.broker_0_9",

Modified: qpid/branches/asyncstore/tests/src/py/qpid_tests/broker_0_10/__init__.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tests/src/py/qpid_tests/broker_0_10/__init__.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tests/src/py/qpid_tests/broker_0_10/__init__.py (original)
+++ qpid/branches/asyncstore/tests/src/py/qpid_tests/broker_0_10/__init__.py Fri Aug  3 12:13:32 2012
@@ -36,3 +36,4 @@ from extensions import *
 from msg_groups import *
 from new_api import *
 from stats import *
+from qmf_events import *

Modified: qpid/branches/asyncstore/tests/src/py/qpid_tests/broker_0_10/management.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tests/src/py/qpid_tests/broker_0_10/management.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tests/src/py/qpid_tests/broker_0_10/management.py (original)
+++ qpid/branches/asyncstore/tests/src/py/qpid_tests/broker_0_10/management.py Fri Aug  3 12:13:32 2012
@@ -302,9 +302,10 @@ class ManagementTest (TestBase010):
 
         twenty = range(1,21)
         props = session.delivery_properties(routing_key="routing_key")
+        mp    = session.message_properties(application_headers={'x-qpid.trace' : 'A,B,C'})
         for count in twenty:
             body = "Reroute Message %d" % count
-            msg = Message(props, body)
+            msg = Message(props, mp, body)
             session.message_transfer(destination="amq.direct", message=msg)
 
         pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]
@@ -317,6 +318,16 @@ class ManagementTest (TestBase010):
         self.assertEqual(pq.msgDepth,19)
         self.assertEqual(aq.msgDepth,1)
 
+        "Verify that the trace was cleared on the rerouted message"
+        url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host, self.broker.port)
+        conn = qpid.messaging.Connection(url)
+        conn.open()
+        sess = conn.session()
+        rx = sess.receiver("alt-queue1;{mode:browse}")
+        rm = rx.fetch(1)
+        self.assertEqual(rm.properties['x-qpid.trace'], '')
+        conn.close()
+
         "Reroute top 9 messages from reroute-queue to alt.direct2"
         result = pq.reroute(9, False, "alt.direct2", {})
         self.assertEqual(result.status, 0) 

Modified: qpid/branches/asyncstore/tests/src/py/qpid_tests/broker_0_10/query.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tests/src/py/qpid_tests/broker_0_10/query.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tests/src/py/qpid_tests/broker_0_10/query.py (original)
+++ qpid/branches/asyncstore/tests/src/py/qpid_tests/broker_0_10/query.py Fri Aug  3 12:13:32 2012
@@ -26,7 +26,7 @@ class QueryTests(TestBase010):
 
     def test_queue_query(self):
         session = self.session
-        session.queue_declare(queue="my-queue", exclusive=True)
+        session.queue_declare(queue="my-queue", exclusive=True, auto_delete=True)
         result = session.queue_query(queue="my-queue")
         self.assertEqual("my-queue", result.queue)
 

Propchange: qpid/branches/asyncstore/tests/src/py/qpid_tests/broker_0_9/queue.py
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py:r1333988-1368650

Modified: qpid/branches/asyncstore/tools/setup.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/setup.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/setup.py (original)
+++ qpid/branches/asyncstore/tools/setup.py Fri Aug  3 12:13:32 2012
@@ -20,7 +20,7 @@
 from distutils.core import setup
 
 setup(name="qpid-tools",
-      version="0.17",
+      version="0.19",
       author="Apache Qpid",
       author_email="dev@qpid.apache.org",
       package_dir={'' : 'src/py'},

Modified: qpid/branches/asyncstore/tools/src/py/.gitignore
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/.gitignore?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/.gitignore (original)
+++ qpid/branches/asyncstore/tools/src/py/.gitignore Fri Aug  3 12:13:32 2012
@@ -19,4 +19,5 @@
 # with the License.  You may obtain a copy of the License at
 /qpid-clusterc
 /qpid-configc
+/qpid-hac
 /qpid-routec

Modified: qpid/branches/asyncstore/tools/src/py/qpid-cluster
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpid-cluster?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpid-cluster (original)
+++ qpid/branches/asyncstore/tools/src/py/qpid-cluster Fri Aug  3 12:13:32 2012
@@ -64,17 +64,19 @@ class IpAddr:
         return bestAddr
 
 class BrokerManager:
-    def __init__(self, config):
-        self.config = config
-        self.brokerName = None
-        self.qmf        = None
-        self.broker     = None
-        self.brokers    = []
+    def __init__(self, config, conn_options):
+        self.config       = config
+        self.cert         = None
+        self.conn_options = conn_options
+        self.brokerName   = None
+        self.qmf          = None
+        self.broker       = None
+        self.brokers      = []
 
     def SetBroker(self, brokerUrl):
         self.url = brokerUrl
         self.qmf = Session()
-        self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout)
+        self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout, **self.conn_options)
         agents = self.qmf.getAgents()
         for a in agents:
             if a.getAgentBank() == '0':
@@ -240,6 +242,8 @@ def main(argv=None):
                       description="Example: $ qpid-cluster -C  broker-host:10000")
 
         parser.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)")
+        parser.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+        parser.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
         parser.add_option("-C", "--all-connections", action="store_true", default=False, help="View client connections to all cluster members")
         parser.add_option("-c", "--connections",  metavar="ID", help="View client connections to specified member")
         parser.add_option("-d", "--del-connection",  metavar="HOST:PORT", help="Disconnect a client connection")
@@ -280,7 +284,13 @@ def main(argv=None):
         config._force = opts.force
         config._numeric = opts.numeric
 
-        bm    = BrokerManager(config)
+        conn_options = {}
+        if opts.sasl_mechanism:
+            conn_options['mechanisms'] = opts.sasl_mechanism
+        if opts.ssl_certificate:
+            conn_options['ssl_certfile'] = opts.ssl_certificate
+
+        bm = BrokerManager(config, conn_options)
 
         try:
             bm.SetBroker(config._host)
@@ -303,7 +313,6 @@ def main(argv=None):
 
         bm.Disconnect()
     except Exception, e:
-        raise
         print str(e)
         return 1
 

Modified: qpid/branches/asyncstore/tools/src/py/qpid-config
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpid-config?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpid-config (original)
+++ qpid/branches/asyncstore/tools/src/py/qpid-config Fri Aug  3 12:13:32 2012
@@ -88,7 +88,6 @@ class Config:
         self._altern_ex         = None
         self._durable           = False
         self._replicate       = None
-        self._ha_admin          = False
         self._clusterDurable    = False
         self._if_empty          = True
         self._if_unused         = True
@@ -102,7 +101,6 @@ class Config:
         self._ive               = False
         self._eventGeneration   = None
         self._file              = None
-        self._sasl_mechanism    = None
         self._flowStopCount     = None
         self._flowResumeCount   = None
         self._flowStopSize      = None
@@ -114,6 +112,7 @@ class Config:
         self._returnCode        = 0
 
 config = Config()
+conn_options = {}
 
 FILECOUNT = "qpid.file_count"
 FILESIZE  = "qpid.file_size"
@@ -177,6 +176,9 @@ def OptionsAndArguments(argv):
     group1.add_option("-r", "--recursive", action="store_true", help="Show bindings in queue or exchange list")
     group1.add_option("-b", "--broker", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]")
     group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+    group1.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+    group1.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
+    group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
     parser.add_option_group(group1)
 
     group_ls = OptionGroup(parser, "Options for Listing Exchanges and Queues")
@@ -187,7 +189,6 @@ def OptionsAndArguments(argv):
     group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.")
     group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.")
     group2.add_option("--replicate", action="store", metavar="<level>", help="Enable automatic replication in a HA cluster. <level> is 'none', 'configuration' or 'all').")
-    group2.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
     parser.add_option_group(group2)
 
     group3 = OptionGroup(parser, "Options for Adding Queues")
@@ -306,6 +307,16 @@ def OptionsAndArguments(argv):
         config._extra_arguments = opts.extra_arguments
     if opts.start_replica:
         config._start_replica = opts.start_replica
+
+    if opts.sasl_mechanism:
+        conn_options['sasl_mechanisms'] = opts.sasl_mechanism
+    if opts.ssl_certificate:
+        conn_options['ssl_certfile'] = opts.ssl_certificate
+    if opts.ssl_key:
+        conn_options['ssl_key'] = opts.ssl_key
+    if opts.ha_admin:
+        conn_options['client_properties'] = {'qpid.ha-admin' : 1}
+
     return args
 
 
@@ -355,11 +366,9 @@ class BrokerManager:
         self.conn       = None
         self.broker     = None
 
-    def SetBroker(self, brokerUrl, mechanism):
+    def SetBroker(self, brokerUrl):
         self.url = brokerUrl
-        client_properties={}
-        if config._ha_admin: client_properties["qpid.ha-admin"] = 1
-        self.conn = Connection.establish(self.url, sasl_mechanisms=mechanism, client_properties=client_properties)
+        self.conn = Connection.establish(self.url, **conn_options)
         self.broker = BrokerAgent(self.conn)
 
     def Disconnect(self):
@@ -690,7 +699,7 @@ def main(argv=None):
     bm   = BrokerManager()
 
     try:
-        bm.SetBroker(config._host, config._sasl_mechanism)
+        bm.SetBroker(config._host)
         if len(args) == 0:
             bm.Overview()
         else:

Modified: qpid/branches/asyncstore/tools/src/py/qpid-ha
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpid-ha?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpid-ha (original)
+++ qpid/branches/asyncstore/tools/src/py/qpid-ha Fri Aug  3 12:13:32 2012
@@ -19,8 +19,7 @@
 # under the License.
 #
 
-import qmf.console, optparse, sys, time, os
-from qpid.management import managementChannel, managementClient
+import optparse, sys, time, os
 from qpid.messaging import Connection
 from qpid.messaging import Message as QpidMessage
 from qpidtoollibs.broker import BrokerAgent
@@ -32,29 +31,44 @@ except ImportError:
 # QMF address for the HA broker object.
 HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker"
 
+class ExitStatus(Exception):
+    """Raised if a command want's a non-0 exit status from the script"""
+    def __init__(self, status): self.status = status
+
 class Command:
     commands = {}
 
-    def __init__(self, name, help, args=[]):
+    def __init__(self, name, help, arg_names=[]):
         Command.commands[name] = self
         self.name = name
-        self.args = args
-        usage="%s [options] %s\n\n%s"%(name, " ".join(args), help)
+        self.arg_names = arg_names
+        usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help)
         self.help = help
         self.op=optparse.OptionParser(usage)
-        self.op.add_option("-b", "--broker", metavar="<url>", help="Connect to broker at <url>")
-
-    def execute(self):
-        opts, args = self.op.parse_args()
-        if len(args) != len(self.args)+1:
+        self.op.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+        self.op.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+        self.op.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
+        self.op.add_option("-b", "--broker", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]")
+
+    def execute(self, args):
+        opts, args = self.op.parse_args(args)
+        if len(args) != len(self.arg_names)+1:
             self.op.print_help()
             raise Exception("Wrong number of arguments")
-        broker = opts.broker or "localhost:5672"
-        connection = Connection.establish(broker, client_properties={"qpid.ha-admin":1})
+        conn_options = {}
+        if opts.sasl_mechanism:
+            conn_options['sasl_mechanisms'] = opts.sasl_mechanism
+        if opts.ssl_certificate:
+            conn_options['ssl_certfile'] = opts.ssl_certificate
+        if opts.ssl_key:
+            conn_options['ssl_key'] = opts.ssl_key
+        conn_options['client_properties'] = {'qpid.ha-admin' : 1}
+
+        connection = Connection.establish(opts.broker, **conn_options)
         qmf_broker = BrokerAgent(connection)
         ha_broker = qmf_broker.getHaBroker()
-        if not ha_broker: raise Exception("HA module is not loaded on broker at %s"%broker)
-        try: return self.do_execute(qmf_broker, ha_broker, opts, args)
+        if not ha_broker: raise Exception("HA module is not loaded on broker at %s" % opts.broker)
+        try: self.do_execute(qmf_broker, ha_broker, opts, args)
         finally: connection.close()
 
     def do_execute(self, qmf_broker, opts, args):
@@ -75,10 +89,10 @@ class StatusCmd(Command):
             help="Don't print status but return 0 if it matches <status>, 1 otherwise")
     def do_execute(self, qmf_broker, ha_broker, opts, args):
         if opts.expect:
-            if opts.expect != ha_broker.status: return 1
+            if opts.expect != ha_broker.status: raise ExitStatus(1)
         else:
             print ha_broker.status
-        return 0
+
 StatusCmd()
 
 class ReplicateCmd(Command):
@@ -93,30 +107,31 @@ class SetCmd(Command):
         Command.__init__(self, "set", "Set HA configuration settings")
         def add(optname, metavar, type, help):
             self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store")
-        add("--brokers", "<url>", "string", "HA brokers use <url> to connect to each other")
-        add("--public-brokers", "<url>", "string", "Clients use <url> to connect to HA brokers")
+        add("--brokers-url", "<url>", "string", "URL with address of each broker in the cluster. Used by brokers to connect to each other.")
+        add("--public-url", "<url>", "string", "URL advertised to clients to connect to the cluster. May be a list or a VIP.")
         add("--backups", "<n>", "int", "Expect <n> backups to be running"),
 
     def do_execute(self, qmf_broker, ha_broker, opts, args):
-        if (opts.brokers): qmf_broker._method("setBrokers", {"url":opts.brokers}, HA_BROKER)
-        if (opts.public_brokers): qmf_broker._method("setPublicBrokers", {"url":opts.public_brokers}, HA_BROKER)
+        if (opts.brokers_url): qmf_broker._method("setBrokersUrl", {"url":opts.brokers_url}, HA_BROKER)
+        if (opts.public_url): qmf_broker._method("setPublicUrl", {"url":opts.public_url}, HA_BROKER)
         if (opts.backups): qmf_broker._method("setExpectedBackups", {"expectedBackups":opts.backups}, HA_BROKER)
 
 SetCmd()
 
 class QueryCmd(Command):
     def __init__(self):
-        Command.__init__(self, "query", "Print HA configuration settings")
+        Command.__init__(self, "query", "Print HA configuration and status")
 
     def do_execute(self, qmf_broker, ha_broker, opts, args):
         hb = ha_broker
         for x in [("Status:", hb.status),
-                  ("Brokers URL:", hb.brokers),
-                  ("Public URL:", hb.publicBrokers),
+                  ("Brokers URL:", hb.brokersUrl),
+                  ("Public URL:", hb.publicUrl),
                   ("Expected Backups:", hb.expectedBackups),
                   ("Replicate: ", hb.replicateDefault)
                   ]:
             print "%-20s %s"%(x[0], x[1])
+
 QueryCmd()
 
 def print_usage(prog):
@@ -133,18 +148,25 @@ def find_command(args):
             return Command.commands[arg]
     return None
 
-def main(argv):
-    try:
-        args=argv[1:]
-        if args and args[0] == "--help-all":
-            for c in Command.commands.itervalues():
-                c.op.print_help(); print
-            return 1
+def main_except(argv):
+    """This version of main raises exceptions"""
+    args=argv[1:]
+    if args and args[0] == "--help-all":
+        for c in Command.commands.itervalues():
+            c.op.print_help(); print
+    else:
         command = find_command(args)
         if not command:
             print_usage(os.path.basename(argv[0]));
-            return 1;
-        if command.execute(): return 1
+            raise Exception("Command not found")
+        command.execute(args)
+
+def main(argv):
+    try:
+        main_except(argv)
+        return 0
+    except ExitStatus, e:
+        return e.status
     except Exception, e:
         print e
         return 1

Modified: qpid/branches/asyncstore/tools/src/py/qpid-printevents
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpid-printevents?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpid-printevents (original)
+++ qpid/branches/asyncstore/tools/src/py/qpid-printevents Fri Aug  3 12:13:32 2012
@@ -21,34 +21,84 @@
 
 import os
 import optparse
-from optparse import IndentedHelpFormatter
 import sys
-import socket
-from time import time, strftime, gmtime, sleep
-from qmf.console import Console, Session
+from optparse       import IndentedHelpFormatter
+from time           import time, strftime, gmtime, sleep
+from threading      import Lock, Condition, Thread
+from qpid.messaging import Connection
+import qpid.messaging.exceptions
+
+home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools"))
+sys.path.append(os.path.join(home, "python"))
+
+from qpidtoollibs.broker import EventHelper
+
+
+class Printer(object):
+  """
+  This class serializes printed lines so that events coming from different
+  threads don't overlap each other.
+  """
+  def __init__(self):
+    self.lock = Lock()
 
-
-class EventConsole(Console):
-  def event(self, broker, event):
-    print event
-    sys.stdout.flush()
-
-  def brokerConnected(self, broker):
-    print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnected broker=%s" % broker.getUrl()
+  def pr(self, text):
+    self.lock.acquire()
+    try:
+      print text
+    finally:
+      self.lock.release()
     sys.stdout.flush()
+  
 
-  def brokerConnectionFailed(self, broker):
-    print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnectionFailed broker=%s %s" % (broker.getUrl(), str(broker.conn_exc))
-    sys.stdout.flush()
+class EventReceiver(Thread):
+  """
+  One instance of this class is created for each broker that is being monitored.
+  This class does not use the "reconnect" option because it needs to report as
+  events when the connection is established and when it's lost.
+  """
+  def __init__(self, printer, url, options):
+    Thread.__init__(self)
+    self.printer   = printer
+    self.url       = url
+    self.options   = options
+    self.running   = True
+    self.helper    = EventHelper()
+
+  def cancel(self):
+    self.running = False
+
+  def run(self):
+    isOpen = False
+    while self.running:
+      try:
+        conn = Connection.establish(self.url, **options)
+        isOpen = True
+        self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC qpid-printevents:brokerConnected broker=%s" % self.url)
+
+        sess = conn.session()
+        rx = sess.receiver(self.helper.eventAddress())
+
+        while self.running:
+          try:
+            msg = rx.fetch(1)
+            event = self.helper.event(msg)
+            self.printer.pr(event.__repr__())
+            sess.acknowledge()
+          except qpid.messaging.exceptions.Empty:
+            pass
+        
+      except Exception, e:
+        if isOpen:
+          self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC qpid-printevents:brokerDisconnected broker=%s" % self.url)
+        isOpen = False
+        sleep(1)
 
-  def brokerDisconnected(self, broker):
-    print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl()
-    sys.stdout.flush()
 
 class JHelpFormatter(IndentedHelpFormatter):
-    """Format usage and description without stripping newlines from usage strings
     """
-
+    Format usage and description without stripping newlines from usage strings
+    """
     def format_usage(self, usage):
         return usage
 
@@ -82,21 +132,42 @@ def main(argv=None):
   p = optparse.OptionParser(usage=_usage, description=_description, formatter=JHelpFormatter())
   p.add_option("--heartbeats", action="store_true", default=False, help="Use heartbeats.")
   p.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+  p.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+  p.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
+  p.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
 
   options, arguments = p.parse_args(args=argv)
   if len(arguments) == 0:
     arguments.append("localhost")
 
-  console = EventConsole()
-  session = Session(console, rcvObjects=False, rcvHeartbeats=options.heartbeats, manageConnections=True)
-  brokers = []
+  brokers   = []
+  conn_options = {}
+  props = {}
+  printer   = Printer()
+
+  if options.sasl_mechanism:
+    conn_options['sasl_mechanisms'] = options.sasl_mechanism
+  if options.ssl_certificate:
+    conn_options['ssl_certfile'] = options.ssl_certificate
+  if options.ssl_key:
+    conn_options['ssl_key'] = options.ssl_key
+  if options.ha_admin:
+    props['qpid.ha-admin'] = 1
+  if options.heartbeats:
+    props['heartbeat'] = 5
+
+  if len(props) > 0:
+    conn_options['client_properties'] = props
+
   try:
     try:
       for host in arguments:
-        brokers.append(session.addBroker(host, None, options.sasl_mechanism))
+        er = EventReceiver(printer, host, conn_options)
+        brokers.append(er)
+        er.start()
 
-        while (True):
-          sleep(10)
+      while (True):
+        sleep(10)
 
     except KeyboardInterrupt:
         print
@@ -106,9 +177,10 @@ def main(argv=None):
         print "Failed: %s - %s" % (e.__class__.__name__, e)
         return 1
   finally:
-    while len(brokers):
-      b = brokers.pop()
-      session.delBroker(b)
+    for b in brokers:
+      b.cancel()
+    for b in brokers:
+      b.join()
 
 if __name__ == '__main__':
   sys.exit(main())

Modified: qpid/branches/asyncstore/tools/src/py/qpid-queue-stats
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpid-queue-stats?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpid-queue-stats (original)
+++ qpid/branches/asyncstore/tools/src/py/qpid-queue-stats Fri Aug  3 12:13:32 2012
@@ -32,13 +32,13 @@ from qpid.connection import Connection, 
 from time            import sleep
 
 class BrokerManager(Console):
-  def __init__(self, host, mechanism):
+  def __init__(self, host, conn_options):
     self.url = host
     self.objects = {}
     self.filter  = None
     self.session = Session(self, rcvEvents=False, rcvHeartbeats=False,
                            userBindings=True, manageConnections=True)
-    self.broker  = self.session.addBroker(self.url, None, mechanism)
+    self.broker  = self.session.addBroker(self.url, **conn_options)
     self.firstError = True
 
   def setFilter(self,filter):
@@ -126,17 +126,23 @@ def main(argv=None):
   p.add_option('--broker-address','-a', default='localhost' , help='broker-addr is in the form:  [username/password@] hostname | ip-address [:<port>] \n ex:  localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost')
   p.add_option('--filter','-f' ,default=None ,help='a list of comma separated queue names (regex are accepted) to show')
   p.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
- 
+  p.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
 
   options, arguments = p.parse_args(args=argv)
 
+  conn_options = {}
+  if options.sasl_mechanism:
+    conn_options['mechanisms'] = options.sasl_mechanism
+  if options.ssl_certificate:
+    conn_options['ssl_certfile'] = options.ssl_certificate
+
   host = options.broker_address
   filter = []
   if options.filter != None:
     for s in options.filter.split(","):
         filter.append(re.compile(s))
 
-  bm = BrokerManager(host, options.sasl_mechanism)
+  bm = BrokerManager(host, conn_options)
   bm.setFilter(filter)
   bm.Display()
  

Modified: qpid/branches/asyncstore/tools/src/py/qpid-route
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpid-route?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpid-route (original)
+++ qpid/branches/asyncstore/tools/src/py/qpid-route Fri Aug  3 12:13:32 2012
@@ -53,16 +53,15 @@ def Usage():
 
 class Config:
     def __init__(self):
-        self._verbose   = False
-        self._quiet     = False
-        self._durable   = False
-        self._dellink   = False
-        self._srclocal  = False
-        self._transport = "tcp"
-        self._ack       = 0
-        self._connTimeout = 10
-        self._client_sasl_mechanism = None
-        self._ha_admin  = False
+        self._verbose      = False
+        self._quiet        = False
+        self._durable      = False
+        self._dellink      = False
+        self._srclocal     = False
+        self._transport    = "tcp"
+        self._ack          = 0
+        self._connTimeout  = 10
+        self._conn_options = {}
 
 config = Config()
 
@@ -97,6 +96,7 @@ def OptionsAndArguments(argv):
     parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp")
 
     parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.")
+    parser.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
     parser.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
     opts, encArgs = parser.parse_args(args=argv)
 
@@ -130,13 +130,16 @@ def OptionsAndArguments(argv):
         config._transport = opts.transport
 
     if opts.ha_admin:
-        config._ha_admin = True
+        config._conn_options['client_properties'] = {'qpid.ha-admin' : 1}
 
     if opts.ack:
         config._ack = opts.ack
 
     if opts.client_sasl_mechanism:
-        config._client_sasl_mechanism = opts.client_sasl_mechanism
+        config._conn_options['mechanisms'] = opts.client_sasl_mechanism
+
+    if opts.ssl_certificate:
+        config._conn_options['ssl_certfile'] = opts.ssl_certificate
 
     return args
 
@@ -147,9 +150,7 @@ class RouteManager:
         self.local = BrokerURL(localBroker)
         self.remote  = None
         self.qmf = Session()
-        client_properties = {}
-        if config._ha_admin: client_properties["qpid.ha-admin"] = 1
-        self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._client_sasl_mechanism, client_properties=client_properties)
+        self.broker = self.qmf.addBroker(localBroker, config._connTimeout, **config._conn_options)
         self.broker._waitForStable()
         self.agent = self.broker.getBrokerAgent()
 

Modified: qpid/branches/asyncstore/tools/src/py/qpid-stat
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpid-stat?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpid-stat (original)
+++ qpid/branches/asyncstore/tools/src/py/qpid-stat Fri Aug  3 12:13:32 2012
@@ -42,17 +42,26 @@ class Config:
         self._limit = 50
         self._increasing = False
         self._sortcol = None
-        self._sasl_mechanism = None
-        self._ha_admin = False
 
 config = Config()
+conn_options = {}
 
 def OptionsAndArguments(argv):
     """ Set global variables for options, return arguments """
 
     global config
+    global conn_options
 
-    parser = OptionParser(usage="usage: %prog [options] -[gcequm] [object-name]")
+    usage = \
+"""%prog -g [options]
+       %prog -c [options]
+       %prog -e [options]
+       %prog -q [options] [queue-name]
+       %prog -u [options]
+       %prog -m [options]
+       %prog --acl [options]"""
+
+    parser = OptionParser(usage=usage)
 
     group1 = OptionGroup(parser, "General Options")
     group1.add_option("-b", "--broker",  action="store", type="string", default="localhost", metavar="<url>",
@@ -61,10 +70,12 @@ def OptionsAndArguments(argv):
                       help="Maximum time to wait for broker connection (in seconds)")
     group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>",
                       help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+    group1.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+    group1.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
     group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
     parser.add_option_group(group1)
 
-    group2 = OptionGroup(parser, "Display Options")
+    group2 = OptionGroup(parser, "Command Options")
     group2.add_option("-g", "--general", help="Show General Broker Stats",  action="store_const", const="g",   dest="show")
     group2.add_option("-c", "--connections", help="Show Connections",       action="store_const", const="c",   dest="show")
     group2.add_option("-e", "--exchanges", help="Show Exchanges",           action="store_const", const="e",   dest="show")
@@ -72,12 +83,14 @@ def OptionsAndArguments(argv):
     group2.add_option("-u", "--subscriptions", help="Show Subscriptions",   action="store_const", const="u",   dest="show")
     group2.add_option("-m", "--memory", help="Show Broker Memory Stats",    action="store_const", const="m",   dest="show")
     group2.add_option(      "--acl", help="Show Access Control List Stats", action="store_const", const="acl", dest="show")
-    group2.add_option("-S", "--sort-by",  metavar="<colname>",                   help="Sort by column name")
-    group2.add_option("-I", "--increasing", action="store_true", default=False,  help="Sort by increasing value (default = decreasing)")
-    group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>",    help="Limit output to n rows")
-
     parser.add_option_group(group2)
 
+    group3 = OptionGroup(parser, "Display Options")
+    group3.add_option("-S", "--sort-by",  metavar="<colname>",                   help="Sort by column name")
+    group3.add_option("-I", "--increasing", action="store_true", default=False,  help="Sort by increasing value (default = decreasing)")
+    group3.add_option("-L", "--limit", type="int", default=50, metavar="<n>",    help="Limit output to n rows")
+    parser.add_option_group(group3)
+
     opts, args = parser.parse_args(args=argv)
 
     if not opts.show:
@@ -89,8 +102,15 @@ def OptionsAndArguments(argv):
     config._connTimeout = opts.timeout
     config._increasing = opts.increasing
     config._limit = opts.limit
-    config._sasl_mechanism = opts.sasl_mechanism
-    config._ha_admin = opts.ha_admin
+
+    if opts.sasl_mechanism:
+        conn_options['sasl_mechanisms'] = opts.sasl_mechanism
+    if opts.ssl_certificate:
+        conn_options['ssl_certfile'] = opts.ssl_certificate
+    if opts.ssl_key:
+        conn_options['ssl_key'] = opts.ssl_key
+    if opts.ha_admin:
+        conn_options['client_properties'] = {'qpid.ha-admin' : 1}
 
     return args
 
@@ -126,11 +146,9 @@ class BrokerManager:
         self.broker     = None
         self.cluster    = None
 
-    def SetBroker(self, brokerUrl, mechanism):
+    def SetBroker(self, brokerUrl):
         self.url = brokerUrl
-        client_properties={}
-        if config._ha_admin: client_properties["qpid.ha-admin"] = 1
-        self.connection = Connection.establish(self.url, sasl_mechanisms=mechanism, client_properties=client_properties)
+        self.connection = Connection.establish(self.url, **conn_options)
         self.broker = BrokerAgent(self.connection)
 
     def Disconnect(self):
@@ -235,9 +253,10 @@ class BrokerManager:
     def displayConn(self):
         disp = Display(prefix="  ")
         heads = []
-        heads.append(Header('client-addr'))
+        heads.append(Header('connection'))
         heads.append(Header('cproc'))
         heads.append(Header('cpid'))
+        heads.append(Header('mech'))
         heads.append(Header('auth'))
         heads.append(Header('connected', Header.DURATION))
         heads.append(Header('idle', Header.DURATION))
@@ -251,6 +270,7 @@ class BrokerManager:
             row.append(conn.address)
             row.append(conn.remoteProcessName)
             row.append(conn.remotePid)
+            row.append(conn.saslMechanism)
             row.append(conn.authIdentity)
             row.append(broker.getUpdateTime() - conn.getCreateTime())
             row.append(broker.getUpdateTime() - conn.getUpdateTime())
@@ -416,7 +436,8 @@ class BrokerManager:
         heads.append(Header("acked", Header.Y))
         heads.append(Header("excl", Header.Y))
         heads.append(Header("creditMode"))
-        heads.append(Header("delivered", Header.KMG))
+        heads.append(Header("delivered", Header.COMMAS))
+        heads.append(Header("sessUnacked", Header.COMMAS))
         rows = []
         subscriptions = self.broker.getAllSubscriptions()
         sessions = self.getSessionMap()
@@ -436,6 +457,7 @@ class BrokerManager:
                 row.append(s.exclusive)
                 row.append(s.creditMode)
                 row.append(s.delivered)
+                row.append(session.unackedMessages)
                 rows.append(row)
             except:
                 pass
@@ -524,7 +546,7 @@ def main(argv=None):
     bm   = BrokerManager()
 
     try:
-        bm.SetBroker(config._host, config._sasl_mechanism)
+        bm.SetBroker(config._host)
         bm.display(args)
         bm.Disconnect()
         return 0

Modified: qpid/branches/asyncstore/tools/src/py/qpid-tool
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpid-tool?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpid-tool (original)
+++ qpid/branches/asyncstore/tools/src/py/qpid-tool Fri Aug  3 12:13:32 2012
@@ -173,11 +173,11 @@ class Mcli(Cmd):
 class QmfData(Console):
   """
   """
-  def __init__(self, disp, url):
+  def __init__(self, disp, url, cert):
     self.disp = disp
     self.url = url
     self.session = Session(self, manageConnections=True)
-    self.broker = self.session.addBroker(self.url)
+    self.broker = self.session.addBroker(self.url, ssl_certfile=cert)
     self.lock = Lock()
     self.connected = None
     self.closing = None
@@ -455,6 +455,7 @@ class QmfData(Console):
           rows.append(row)
       else:
         print "No object found with ID %d" % dispId
+        return
     finally:
       self.lock.release()
     self.disp.table(caption, heads, rows)
@@ -723,10 +724,13 @@ if _host[0] == '-':
   sys.exit(1)
 
 disp = Display()
+cert = None
+if len(cargs) > 1:
+  cert = cargs[1]
 
 # Attempt to make a connection to the target broker
 try:
-  data = QmfData(disp, _host)
+  data = QmfData(disp, _host, cert)
 except Exception, e:
   if str(e).find("Exchange not found") != -1:
     print "Management not enabled on broker:  Use '-m yes' option on broker startup."

Modified: qpid/branches/asyncstore/tools/src/py/qpidtoollibs/broker.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpidtoollibs/broker.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpidtoollibs/broker.py (original)
+++ qpid/branches/asyncstore/tools/src/py/qpidtoollibs/broker.py Fri Aug  3 12:13:32 2012
@@ -18,6 +18,7 @@
 #
 
 from qpid.messaging import Message
+from qpidtoollibs.disp import TimeLong
 try:
   from uuid import uuid4
 except ImportError:
@@ -190,9 +191,13 @@ class BrokerAgent(object):
   def getAcl(self):
     return self._getSingleObject(Acl)
 
-  def echo(self, sequence, body):
+  def getMemory(self):
+    return self._getSingleObject(Memory)
+
+  def echo(self, sequence = 1, body = "Body"):
     """Request a response to test the path to the management broker"""
-    pass
+    args = {'sequence' : sequence, 'body' : body}
+    return self._method('echo', args)
 
   def connect(self, host, port, durable, authMechanism, username, password, transport):
     """Establish a connection to another broker"""
@@ -295,6 +300,41 @@ class BrokerAgent(object):
     return self._getBrokerObject(self, _type, oid)
 
 
+class EventHelper(object):
+  def eventAddress(self, pkg='*', cls='*', sev='*'):
+    return "qmf.default.topic/agent.ind.event.%s.%s.%s.#" % (pkg.replace('.', '_'), cls, sev)
+
+  def event(self, msg):
+    return BrokerEvent(msg)
+
+
+class BrokerEvent(object):
+  def __init__(self, msg):
+    self.msg = msg
+    self.content = msg.content[0]
+    self.values = self.content['_values']
+    self.schema_id = self.content['_schema_id']
+    self.name = "%s:%s" % (self.schema_id['_package_name'], self.schema_id['_class_name'])
+
+  def __repr__(self):
+    rep = "%s %s" % (TimeLong(self.getTimestamp()), self.name)
+    for k,v in self.values.items():
+      rep = rep + " %s=%s" % (k, v)
+    return rep
+
+  def __getattr__(self, key):
+    if key not in self.values:
+      return None
+    value = self.values[key]
+    return value
+
+  def getAttributes(self):
+    return self.values
+
+  def getTimestamp(self):
+    return self.content['_timestamp']
+
+
 class BrokerObject(object):
   def __init__(self, broker, content):
     self.broker = broker
@@ -362,7 +402,7 @@ class Connection(BrokerObject):
     BrokerObject.__init__(self, broker, values)
 
   def close(self):
-    pass
+    self.broker._method("close", {}, "org.apache.qpid.broker:connection:%s" % self.address)
 
 class Session(BrokerObject):
   def __init__(self, broker, values):

Modified: qpid/branches/asyncstore/tools/src/py/qpidtoollibs/disp.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/tools/src/py/qpidtoollibs/disp.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/tools/src/py/qpidtoollibs/disp.py (original)
+++ qpid/branches/asyncstore/tools/src/py/qpidtoollibs/disp.py Fri Aug  3 12:13:32 2012
@@ -167,7 +167,10 @@ class Display:
     for head in heads:
       width = len (head)
       for row in rows:
-        cellWidth = len (unicode (row[col]))
+        text = row[col]
+        if text.__class__ == str:
+          text = text.decode('utf-8')
+        cellWidth = len(unicode(text))
         if cellWidth > width:
           width = cellWidth
       colWidth.append (width + self.tableSpacing)
@@ -187,9 +190,12 @@ class Display:
       line = self.tablePrefix
       col  = 0
       for width in colWidth:
-        line = line + unicode (row[col])
+        text = row[col]
+        if text.__class__ == str:
+          text = text.decode('utf-8')
+        line = line + unicode(text)
         if col < len (heads) - 1:
-          for i in range (width - len (unicode (row[col]))):
+          for i in range (width - len(unicode(text))):
             line = line + " "
         col = col + 1
       print line

Modified: qpid/branches/asyncstore/wcf/src/Apache/Qpid/Interop/InputLink.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/wcf/src/Apache/Qpid/Interop/InputLink.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/wcf/src/Apache/Qpid/Interop/InputLink.cpp (original)
+++ qpid/branches/asyncstore/wcf/src/Apache/Qpid/Interop/InputLink.cpp Fri Aug  3 12:13:32 2012
@@ -21,6 +21,7 @@
 #include <msclr\lock.h>
 
 #include "qpid/client/AsyncSession.h"
+#include "qpid/framing/FieldValue.h"
 #include "qpid/framing/FrameSet.h"
 #include "qpid/client/SubscriptionManager.h"
 #include "qpid/client/Connection.h"



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org