You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/09 02:41:02 UTC
svn commit: r897362 - /incubator/cassandra/trunk/contrib/py_stress/stress.py
Author: jbellis
Date: Sat Jan 9 01:41:01 2010
New Revision: 897362
URL: http://svn.apache.org/viewvc?rev=897362&view=rev
Log:
add latency tracking to stress.py. patch by jbellis
Modified:
incubator/cassandra/trunk/contrib/py_stress/stress.py
Modified: incubator/cassandra/trunk/contrib/py_stress/stress.py
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/py_stress/stress.py?rev=897362&r1=897361&r2=897362&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/py_stress/stress.py (original)
+++ incubator/cassandra/trunk/contrib/py_stress/stress.py Sat Jan 9 01:41:01 2010
@@ -132,16 +132,24 @@
class Operation(Thread):
- def __init__(self, i, counts):
+ def __init__(self, i, counts, latencies):
Thread.__init__(self)
+ # generator of the keys to be used
self.range = xrange(keys_per_thread * i, keys_per_thread * (i + 1))
+ # we can't use a local counter, since that won't be visible to the parent
+ # under multiprocessing. instead, the parent passes a "counts" array
+ # and an index that is our assigned counter.
self.idx = i
self.counts = counts
+ # similarly, a shared array for latency totals
+ self.latencies = latencies
+ # random host for pseudo-load-balancing
[hostname] = random.sample(nodes, 1)
- self.cclient = get_client(host=hostname,port=options.port,
- framed=options.framed)
+ # open client
+ self.cclient = get_client(hostname, options.port, options.framed)
self.cclient.transport.open()
+
class Inserter(Operation):
def run(self):
data = md5(str(get_ident())).hexdigest()
@@ -154,6 +162,7 @@
cfmap= {'Super1': [ColumnOrSuperColumn(super_column=s) for s in supers]}
else:
cfmap = {'Standard1': [ColumnOrSuperColumn(column=c) for c in columns]}
+ start = time.time()
try:
self.cclient.batch_insert('Keyspace1', key, cfmap, ConsistencyLevel.ONE)
except KeyboardInterrupt:
@@ -163,7 +172,9 @@
print e
else:
raise
- self.counts[self.idx]=self.counts[self.idx]+1
+ self.latencies[self.idx] += time.time() - start
+ self.counts[self.idx] += 1
+
class Reader(Operation):
def run(self):
@@ -173,6 +184,7 @@
key = str(key_generator())
for j in xrange(supers_per_key):
parent = ColumnParent('Super1', chr(ord('A') + j))
+ start = time.time()
try:
r = self.cclient.get_slice('Keyspace1', key, parent, p, ConsistencyLevel.ONE)
if not r: raise RuntimeError("Key %s not found" % key)
@@ -183,11 +195,13 @@
print e
else:
raise
- self.counts[self.idx]=self.counts[self.idx]+1
+ self.latencies[self.idx] += time.time() - start
+ self.counts[self.idx] += 1
else:
parent = ColumnParent('Standard1')
for i in xrange(keys_per_thread):
key = str(key_generator())
+ start = time.time()
try:
r = self.cclient.get_slice('Keyspace1', key, parent, p, ConsistencyLevel.ONE)
if not r: raise RuntimeError("Key %s not found" % key)
@@ -198,25 +212,29 @@
print e
else:
raise
- self.counts[self.idx]=self.counts[self.idx]+1
+ self.latencies[self.idx] += time.time() - start
+ self.counts[self.idx] += 1
+
class OperationFactory:
@staticmethod
- def create(type,i,counts):
+ def create(type, i, counts, latencies):
if type == 'read':
- return Reader(i, counts)
+ return Reader(i, counts, latencies)
elif type == 'insert':
- return Inserter(i, counts)
+ return Inserter(i, counts, latencies)
else:
raise RuntimeError, 'Unsupported op!'
+
class Stress(object):
- counts = array('i', [0]*n_threads)
+ counts = array('i', [0] * n_threads)
+ latencies = array('d', [0] * n_threads)
def create_threads(self,type):
threads = []
for i in xrange(n_threads):
- th = OperationFactory.create(type,i, self.counts)
+ th = OperationFactory.create(type, i, self.counts, self.latencies)
threads.append(th)
th.start()
return threads
@@ -227,15 +245,18 @@
outf = open(filename,'w')
else:
outf = sys.stdout
- outf.write('total,interval_op_rate,elapsed_time\n')
- total = old_total = 0
+ outf.write('total,interval_op_rate,avg_latency,elapsed_time\n')
+ total = old_total = latency = old_latency = 0
while True:
time.sleep(options.interval)
- old_total = total
+ old_total, old_latency = total, latency
total = sum(self.counts[th.idx] for th in threads)
+ latency = sum(self.latencies[th.idx] for th in threads)
delta = total - old_total
- elapsed_t = int(time.time()-start_t)
- outf.write('%d,%d,%d\n' % (total, delta / options.interval,elapsed_t))
+ delta_latency = latency - old_latency
+ elapsed_t = int(time.time() - start_t)
+ outf.write('%d,%d,%f,%d\n'
+ % (total, delta / options.interval, delta_latency / delta, elapsed_t))
if not [th for th in threads if th.isAlive()]:
break
@@ -247,6 +268,7 @@
threads = self.create_threads('read')
self.run_test(options.file,threads);
+
stresser = Stress()
benchmark = getattr(stresser, options.operation, None)
if not have_multiproc: