You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/11/21 19:46:35 UTC
svn commit: r1640975 - /qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark
Author: aconway
Date: Fri Nov 21 18:46:35 2014
New Revision: 1640975
URL: http://svn.apache.org/r1640975
Log:
NO-JIRA: Fix qpid-cpp-benchmark to support AMQP 1.0.
Modified:
qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark
Modified: qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1640975&r1=1640974&r2=1640975&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark Fri Nov 21 18:46:35 2014
@@ -18,7 +18,14 @@
# under the License.
#
-import optparse, time, qpid.messaging, re, os
+import optparse, time, re, os
+
+try:
+ import qpid_messaging as qm
+except ImportError:
+ qpid_messaging = None
+ import qpid.messaging as qm
+
from threading import Thread
from subprocess import Popen, PIPE, STDOUT
@@ -46,6 +53,8 @@ op.add_option("--content-size", default=
help="message size in bytes (default %default)")
op.add_option("--ack-frequency", default=100, metavar="N", type="int",
help="receiver ack's every N messages, 0 means unconfirmed (default %default)")
+op.add_option("--tx", default=0, metavar="N", type="int",
+ help="Transaction batch size, 0 means no transactions")
op.add_option("--no-report-header", dest="report_header", default=True,
action="store_false", help="don't print header on report")
op.add_option("--summarize", default=False, action="store_true",
@@ -73,8 +82,6 @@ op.add_option("--save-received", default
help="Save received message content to files <queuename>-receiver-<n>.msg")
op.add_option("--verbose", default=False, action="store_true",
help="Show commands executed")
-op.add_option("--no-delete", default=False, action="store_true",
- help="Don't delete the test queues.")
op.add_option("--fill-drain", default=False, action="store_true",
help="First fill the queues, then drain them")
op.add_option("--qpid-send-path", default="", type="str", metavar="PATH",
@@ -129,7 +136,8 @@ def start_receive(queue, index, opts, re
"--report-total",
"--ack-frequency", str(opts.ack_frequency),
"--ready-address", "%s;{create:always}"%ready_queue,
- "--report-header=no"
+ "--report-header=no",
+ "--tx=%s" % opts.tx
]
if opts.save_received:
command += ["--save-content=%s-receiver-%s.msg"%(queue,index)]
@@ -152,7 +160,8 @@ def start_send(queue, opts, broker, host
"--report-header=no",
"--timestamp=%s"%(opts.timestamp and "yes" or "no"),
"--sequence=%s"%(opts.sequence and "yes" or "no"),
- "--durable", str(opts.durable)
+ "--durable=%d" % opts.durable,
+ "--tx=%s" % opts.tx
]
command += opts.send_arg
if opts.connection_options:
@@ -170,17 +179,50 @@ def first_line(p):
raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err)))
return out.split("\n")[0]
-def recreate_queues(queues, brokers, no_delete, opts):
- c = qpid.messaging.Connection(brokers[0])
- c.open()
- s = c.session()
+def connect(broker, opts):
+ if opts.connection_options:
+ copts = dict([kv.strip().split(":") for kv in opts.connection_options.strip("{}").split(",")])
+ else:
+ copts = {}
+ return qm.Connection.establish(broker, **copts)
+
+def drain(queue, session, opts):
+ """
+ Drain a queue to make sure it is empty. Throw away the messages.
+ """
+ if opts.verbose: print "Draining", queue
+ r = session.receiver(queue, capacity=1000)
+ n = 0
+ try:
+ while True:
+ r.fetch(timeout=0)
+ n += 1
+ if n % 500 == 0: r.session.acknowledge()
+ r.session.acknowledge()
+ except qm.Empty:
+ pass
+ r.close()
+ if opts.verbose: print "Drained", queue, n
+
+def clear_queues(queues, brokers, opts):
+ c = connect(brokers[0], opts)
for q in queues:
- if not no_delete:
- try: s.sender("%s;{delete:always}"%(q)).close()
- except qpid.messaging.exceptions.NotFound: pass
- address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"]))
- if opts.verbose: print "Creating", address
+ s = c.session()
+ need_drain = False
+ try:
+ s.sender("%s;{delete:always}"%(q)).close()
+ if opts.verbose: print "Deleted", q
+ except qm.NotFound:
+ s = c.session()
+ except qm.AddressError:
+ need_drain = True # AMQP 1.0 does not support delete, drain instead.
+ s = c.session()
+ address_opts = ["create:always"]
+ if opts.durable: address_opts += ["node:{durable:true}"]
+ address = "%s;{%s}"%(q, ",".join(opts.create_option + address_opts))
+ if opts.verbose: print "Declaring", address
s.sender(address)
+ if need_drain: drain(q, s, opts)
c.close()
def print_header(timestamp):
@@ -225,19 +267,18 @@ def print_summary(send_stats, recv_stats
class ReadyReceiver:
"""A receiver for ready messages"""
- def __init__(self, queue, broker):
- self.connection = qpid.messaging.Connection(broker)
- self.connection.open()
- self.receiver = self.connection.session().receiver(
- "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue))
+ def __init__(self, queue, broker, opts):
+ self.connection = connect(broker, opts)
+ self.receiver = self.connection.session().receiver(queue)
self.receiver.session.sync()
self.timeout=10
def wait(self, receivers):
try:
for i in receivers: self.receiver.fetch(self.timeout)
+ self.receiver.session.acknowledge()
self.connection.close()
- except qpid.messaging.Empty:
+ except qm.Empty:
for r in receivers:
if (r.poll() is not None):
out,err=r.communicate()
@@ -275,8 +316,8 @@ def main():
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
try:
for i in xrange(opts.repeat):
- recreate_queues(queues, opts.broker, opts.no_delete, opts)
- ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
+ clear_queues(queues+[ready_queue], opts.broker, opts)
+ ready_receiver = ReadyReceiver(ready_queue, opts.broker[0], opts)
def start_receivers():
return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org