You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/11/21 19:46:35 UTC

svn commit: r1640975 - /qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark

Author: aconway
Date: Fri Nov 21 18:46:35 2014
New Revision: 1640975

URL: http://svn.apache.org/r1640975
Log:
NO-JIRA: Fix qpid-cpp-benchmark to support AMQP 1.0.

Modified:
    qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1640975&r1=1640974&r2=1640975&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark Fri Nov 21 18:46:35 2014
@@ -18,7 +18,14 @@
 # under the License.
 #
 
-import optparse, time, qpid.messaging, re, os
+import optparse, time, re, os
+
+try:
+    import qpid_messaging as qm
+except ImportError:
+    qpid_messaging = None
+    import qpid.messaging as qm
+
 from threading import Thread
 from subprocess import Popen, PIPE, STDOUT
 
@@ -46,6 +53,8 @@ op.add_option("--content-size", default=
               help="message size in bytes (default %default)")
 op.add_option("--ack-frequency", default=100, metavar="N", type="int",
               help="receiver ack's every N messages, 0 means unconfirmed (default %default)")
+op.add_option("--tx", default=0, metavar="N", type="int",
+              help="Transaction batch size, 0 means no transactions")
 op.add_option("--no-report-header", dest="report_header", default=True,
               action="store_false", help="don't print header on report")
 op.add_option("--summarize", default=False, action="store_true",
@@ -73,8 +82,6 @@ op.add_option("--save-received", default
               help="Save received message content to files <queuename>-receiver-<n>.msg")
 op.add_option("--verbose", default=False, action="store_true",
               help="Show commands executed")
-op.add_option("--no-delete", default=False, action="store_true",
-              help="Don't delete the test queues.")
 op.add_option("--fill-drain", default=False, action="store_true",
               help="First fill the queues, then drain them")
 op.add_option("--qpid-send-path", default="", type="str", metavar="PATH",
@@ -129,7 +136,8 @@ def start_receive(queue, index, opts, re
                "--report-total",
                "--ack-frequency", str(opts.ack_frequency),
                "--ready-address", "%s;{create:always}"%ready_queue,
-               "--report-header=no"
+               "--report-header=no",
+               "--tx=%s" % opts.tx
                ]
     if opts.save_received:
         command += ["--save-content=%s-receiver-%s.msg"%(queue,index)]
@@ -152,7 +160,8 @@ def start_send(queue, opts, broker, host
                "--report-header=no",
                "--timestamp=%s"%(opts.timestamp and "yes" or "no"),
                "--sequence=%s"%(opts.sequence and "yes" or "no"),
-               "--durable", str(opts.durable)
+               "--durable=%d" % opts.durable,
+               "--tx=%s" % opts.tx
                ]
     command += opts.send_arg
     if opts.connection_options:
@@ -170,17 +179,50 @@ def first_line(p):
         raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err)))
     return out.split("\n")[0]
 
-def recreate_queues(queues, brokers, no_delete, opts):
-    c = qpid.messaging.Connection(brokers[0])
-    c.open()
-    s = c.session()
+def connect(broker, opts):
+    if opts.connection_options:
+        copts = dict([kv.strip().split(":") for kv in opts.connection_options.strip("{}").split(",")])
+    else:
+        copts = {}
+    return qm.Connection.establish(broker, **copts)
+
+def drain(queue, session, opts):
+    """
+    Drain a queue to make sure it is empty. Throw away the messages.
+    """
+    if opts.verbose: print "Draining", queue
+    r = session.receiver(queue, capacity=1000)
+    n = 0
+    try:
+        while True:
+            r.fetch(timeout=0)
+            n += 1
+            if n % 500 == 0: r.session.acknowledge()
+        r.session.acknowledge()
+    except qm.Empty:
+        pass
+    r.close()
+    if opts.verbose: print "Drained", queue, n
+
+def clear_queues(queues, brokers, opts):
+    c = connect(brokers[0], opts)
     for q in queues:
-        if not no_delete:
-            try: s.sender("%s;{delete:always}"%(q)).close()
-            except qpid.messaging.exceptions.NotFound: pass
-        address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"]))
-        if opts.verbose: print "Creating", address
+        s = c.session()
+        need_drain = False
+        try:
+            s.sender("%s;{delete:always}"%(q)).close()
+            if opts.verbose: print "Deleted", q
+        except qm.NotFound:
+            s = c.session()
+        except qm.AddressError:
+            need_drain = True # AMQP 1.0 does not support delete, drain instead.
+            s = c.session()
+        address_opts = ["create:always"]
+        if opts.durable: address_opts += ["node:{durable:true}"]
+        address = "%s;{%s}"%(q, ",".join(opts.create_option + address_opts))
+        if opts.verbose: print "Declaring", address
         s.sender(address)
+        if need_drain: drain(q, s, opts)
     c.close()
 
 def print_header(timestamp):
@@ -225,19 +267,18 @@ def print_summary(send_stats, recv_stats
 
 class ReadyReceiver:
     """A receiver for ready messages"""
-    def __init__(self, queue, broker):
-        self.connection = qpid.messaging.Connection(broker)
-        self.connection.open()
-        self.receiver = self.connection.session().receiver(
-            "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue))
+    def __init__(self, queue, broker, opts):
+        self.connection = connect(broker, opts)
+        self.receiver = self.connection.session().receiver(queue)
         self.receiver.session.sync()
         self.timeout=10
 
     def wait(self, receivers):
         try:
             for i in receivers: self.receiver.fetch(self.timeout)
+            self.receiver.session.acknowledge()
             self.connection.close()
-        except qpid.messaging.Empty:
+        except qm.Empty:
             for r in receivers:
                 if (r.poll() is not None):
                     out,err=r.communicate()
@@ -275,8 +316,8 @@ def main():
     queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
     try:
         for i in xrange(opts.repeat):
-            recreate_queues(queues, opts.broker, opts.no_delete, opts)
-            ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
+            clear_queues(queues+[ready_queue], opts.broker, opts)
+            ready_receiver = ReadyReceiver(ready_queue, opts.broker[0], opts)
 
             def start_receivers():
                 return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org