You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/22 21:43:29 UTC
svn commit: r915052 - /incubator/cassandra/trunk/contrib/py_stress/stress.py
Author: jbellis
Date: Mon Feb 22 20:43:29 2010
New Revision: 915052
URL: http://svn.apache.org/viewvc?rev=915052&view=rev
Log:
add key throughput as well as op throughput, for the benefit of get_range_slice. patch by Brandon Williams; reviewed by jbellis for CASSANDRA-820
Modified:
incubator/cassandra/trunk/contrib/py_stress/stress.py
Modified: incubator/cassandra/trunk/contrib/py_stress/stress.py
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/py_stress/stress.py?rev=915052&r1=915051&r2=915052&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/py_stress/stress.py (original)
+++ incubator/cassandra/trunk/contrib/py_stress/stress.py Mon Feb 22 20:43:29 2010
@@ -28,7 +28,7 @@
from threading import Thread
from thread import get_ident
from array import array
-from hashlib import md5
+from md5 import md5
import time, random, sys, os
from random import randint, gauss
from optparse import OptionParser
@@ -138,17 +138,18 @@
class Operation(Thread):
- def __init__(self, i, counts, latencies):
+ def __init__(self, i, opcounts, keycounts, latencies):
Thread.__init__(self)
# generator of the keys to be used
self.range = xrange(keys_per_thread * i, keys_per_thread * (i + 1))
# we can't use a local counter, since that won't be visible to the parent
- # under multiprocessing. instead, the parent passes a "counts" array
+ # under multiprocessing. instead, the parent passes a "opcounts" array
# and an index that is our assigned counter.
self.idx = i
- self.counts = counts
- # similarly, a shared array for latency totals
+ self.opcounts = opcounts
+ # similarly, a shared array for latency and key totals
self.latencies = latencies
+ self.keycounts = keycounts
# random host for pseudo-load-balancing
[hostname] = random.sample(nodes, 1)
# open client
@@ -180,7 +181,8 @@
else:
raise
self.latencies[self.idx] += time.time() - start
- self.counts[self.idx] += 1
+ self.opcounts[self.idx] += 1
+ self.keycounts[self.idx] += 1
class Reader(Operation):
@@ -203,7 +205,8 @@
else:
raise
self.latencies[self.idx] += time.time() - start
- self.counts[self.idx] += 1
+ self.opcounts[self.idx] += 1
+ self.keycounts[self.idx] += 1
else:
parent = ColumnParent('Standard1')
for i in xrange(keys_per_thread):
@@ -220,7 +223,8 @@
else:
raise
self.latencies[self.idx] += time.time() - start
- self.counts[self.idx] += 1
+ self.opcounts[self.idx] += 1
+ self.keycounts[self.idx] += 1
class RangeSlicer(Operation):
def run(self):
@@ -249,9 +253,10 @@
else:
raise
self.latencies[self.idx] += time.time() - begin
- self.counts[self.idx] += 1
+ self.opcounts[self.idx] += 1
current += len(r) + 1
- last += len(r)
+ last += current + options.rangecount
+ self.keycounts[self.idx] += len(r)
else:
parent = ColumnParent('Standard1')
while current < end:
@@ -267,34 +272,37 @@
if options.ignore:
print e
else:
+ print start, finish
raise
current += len(r) + 1
- last += len(r)
+ last += current + options.rangecount
self.latencies[self.idx] += time.time() - begin
- self.counts[self.idx] += 1
+ self.opcounts[self.idx] += 1
+ self.keycounts[self.idx] += len(r)
class OperationFactory:
@staticmethod
- def create(type, i, counts, latencies):
+ def create(type, i, opcounts, keycounts, latencies):
if type == 'read':
- return Reader(i, counts, latencies)
+ return Reader(i, opcounts, keycounts, latencies)
elif type == 'insert':
- return Inserter(i, counts, latencies)
+ return Inserter(i, opcounts, keycounts, latencies)
elif type == 'rangeslice':
- return RangeSlicer(i, counts, latencies)
+ return RangeSlicer(i, opcounts, keycounts, latencies)
else:
raise RuntimeError, 'Unsupported op!'
class Stress(object):
- counts = array('i', [0] * n_threads)
+ opcounts = array('i', [0] * n_threads)
latencies = array('d', [0] * n_threads)
+ keycounts = array('i', [0] * n_threads)
def create_threads(self,type):
threads = []
for i in xrange(n_threads):
- th = OperationFactory.create(type, i, self.counts, self.latencies)
+ th = OperationFactory.create(type, i, self.opcounts, self.keycounts, self.latencies)
threads.append(th)
th.start()
return threads
@@ -305,19 +313,24 @@
outf = open(filename,'w')
else:
outf = sys.stdout
- outf.write('total,interval_op_rate,avg_latency,elapsed_time\n')
- total = old_total = latency = old_latency = 0
+ outf.write('total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time\n')
+ total = old_total = latency = keycount = old_keycount = old_latency = 0
while True:
time.sleep(options.interval)
- old_total, old_latency = total, latency
- total = sum(self.counts[th.idx] for th in threads)
+ old_total, old_latency, old_keycount = total, latency, keycount
+ total = sum(self.opcounts[th.idx] for th in threads)
latency = sum(self.latencies[th.idx] for th in threads)
- delta = total - old_total
+ keycount = sum(self.keycounts[th.idx] for th in threads)
+ opdelta = total - old_total
+ keydelta = keycount - old_keycount
delta_latency = latency - old_latency
- delta_formatted = (delta_latency / delta) if delta > 0 else 'NAN'
+ if opdelta > 0:
+ delta_formatted = (delta_latency / opdelta)
+ else:
+ delta_formatted = 'NaN'
elapsed_t = int(time.time() - start_t)
- outf.write('%d,%d,%s,%d\n'
- % (total, delta / options.interval, delta_formatted, elapsed_t))
+ outf.write('%d,%d,%d,%s,%d\n'
+ % (total, opdelta / options.interval, keydelta / options.interval, delta_formatted, elapsed_t))
if not [th for th in threads if th.isAlive()]:
break