You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/09 02:41:02 UTC

svn commit: r897362 - /incubator/cassandra/trunk/contrib/py_stress/stress.py

Author: jbellis
Date: Sat Jan  9 01:41:01 2010
New Revision: 897362

URL: http://svn.apache.org/viewvc?rev=897362&view=rev
Log:
add latency tracking to stress.py.  patch by jbellis

Modified:
    incubator/cassandra/trunk/contrib/py_stress/stress.py

Modified: incubator/cassandra/trunk/contrib/py_stress/stress.py
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/py_stress/stress.py?rev=897362&r1=897361&r2=897362&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/py_stress/stress.py (original)
+++ incubator/cassandra/trunk/contrib/py_stress/stress.py Sat Jan  9 01:41:01 2010
@@ -132,16 +132,24 @@
 
 
 class Operation(Thread):
-    def __init__(self, i, counts):
+    def __init__(self, i, counts, latencies):
         Thread.__init__(self)
+        # generator of the keys to be used
         self.range = xrange(keys_per_thread * i, keys_per_thread * (i + 1))
+        # we can't use a local counter, since that won't be visible to the parent
+        # under multiprocessing.  instead, the parent passes a "counts" array
+        # and an index that is our assigned counter.
         self.idx = i
         self.counts = counts
+        # similarly, a shared array for latency totals
+        self.latencies = latencies
+        # random host for pseudo-load-balancing
         [hostname] = random.sample(nodes, 1)
-        self.cclient = get_client(host=hostname,port=options.port,
-                                  framed=options.framed)
+        # open client
+        self.cclient = get_client(hostname, options.port, options.framed)
         self.cclient.transport.open()
 
+
 class Inserter(Operation):
     def run(self):
         data = md5(str(get_ident())).hexdigest()
@@ -154,6 +162,7 @@
                 cfmap= {'Super1': [ColumnOrSuperColumn(super_column=s) for s in supers]}
             else:
                 cfmap = {'Standard1': [ColumnOrSuperColumn(column=c) for c in columns]}
+            start = time.time()
             try:
                 self.cclient.batch_insert('Keyspace1', key, cfmap, ConsistencyLevel.ONE)
             except KeyboardInterrupt:
@@ -163,7 +172,9 @@
                     print e
                 else:
                     raise
-            self.counts[self.idx]=self.counts[self.idx]+1
+            self.latencies[self.idx] += time.time() - start
+            self.counts[self.idx] += 1
+
 
 class Reader(Operation):
     def run(self):
@@ -173,6 +184,7 @@
                 key = str(key_generator())
                 for j in xrange(supers_per_key):
                     parent = ColumnParent('Super1', chr(ord('A') + j))
+                    start = time.time()
                     try:
                         r = self.cclient.get_slice('Keyspace1', key, parent, p, ConsistencyLevel.ONE)
                         if not r: raise RuntimeError("Key %s not found" % key)
@@ -183,11 +195,13 @@
                             print e
                         else:
                             raise
-                    self.counts[self.idx]=self.counts[self.idx]+1
+                    self.latencies[self.idx] += time.time() - start
+                    self.counts[self.idx] += 1
         else:
             parent = ColumnParent('Standard1')
             for i in xrange(keys_per_thread):
                 key = str(key_generator())
+                start = time.time()
                 try:
                     r = self.cclient.get_slice('Keyspace1', key, parent, p, ConsistencyLevel.ONE)
                     if not r: raise RuntimeError("Key %s not found" % key)
@@ -198,25 +212,29 @@
                         print e
                     else:
                         raise
-                self.counts[self.idx]=self.counts[self.idx]+1
+                self.latencies[self.idx] += time.time() - start
+                self.counts[self.idx] += 1
+
 
 class OperationFactory:
     @staticmethod
-    def create(type,i,counts):
+    def create(type, i, counts, latencies):
         if type == 'read':
-            return Reader(i, counts)
+            return Reader(i, counts, latencies)
         elif type == 'insert':
-            return Inserter(i, counts)
+            return Inserter(i, counts, latencies)
         else:
             raise RuntimeError, 'Unsupported op!'
 
+
 class Stress(object):
-    counts = array('i', [0]*n_threads)
+    counts = array('i', [0] * n_threads)
+    latencies = array('d', [0] * n_threads)
 
     def create_threads(self,type):
         threads = []
         for i in xrange(n_threads):
-            th = OperationFactory.create(type,i, self.counts)
+            th = OperationFactory.create(type, i, self.counts, self.latencies)
             threads.append(th)
             th.start()
         return threads
@@ -227,15 +245,18 @@
             outf = open(filename,'w')
         else:
             outf = sys.stdout
-        outf.write('total,interval_op_rate,elapsed_time\n')
-        total = old_total = 0
+        outf.write('total,interval_op_rate,avg_latency,elapsed_time\n')
+        total = old_total = latency = old_latency = 0
         while True:
             time.sleep(options.interval)
-            old_total = total
+            old_total, old_latency = total, latency
             total = sum(self.counts[th.idx] for th in threads)
+            latency = sum(self.latencies[th.idx] for th in threads)
             delta = total - old_total
-            elapsed_t = int(time.time()-start_t)
-            outf.write('%d,%d,%d\n' % (total, delta / options.interval,elapsed_t))
+            delta_latency = latency - old_latency
+            elapsed_t = int(time.time() - start_t)
+            outf.write('%d,%d,%f,%d\n' 
+                       % (total, delta / options.interval, delta_latency / delta, elapsed_t))
             if not [th for th in threads if th.isAlive()]:
                 break
 
@@ -247,6 +268,7 @@
         threads = self.create_threads('read')
         self.run_test(options.file,threads);
         
+
 stresser = Stress()
 benchmark = getattr(stresser, options.operation, None)
 if not have_multiproc: