You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/22 21:43:29 UTC

svn commit: r915052 - /incubator/cassandra/trunk/contrib/py_stress/stress.py

Author: jbellis
Date: Mon Feb 22 20:43:29 2010
New Revision: 915052

URL: http://svn.apache.org/viewvc?rev=915052&view=rev
Log:
add key throughput as well as op throughput, for the benefit of get_range_slice.  patch by Brandon Williams; reviewed by jbellis for CASSANDRA-820

Modified:
    incubator/cassandra/trunk/contrib/py_stress/stress.py

Modified: incubator/cassandra/trunk/contrib/py_stress/stress.py
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/py_stress/stress.py?rev=915052&r1=915051&r2=915052&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/py_stress/stress.py (original)
+++ incubator/cassandra/trunk/contrib/py_stress/stress.py Mon Feb 22 20:43:29 2010
@@ -28,7 +28,7 @@
     from threading import Thread
     from thread import get_ident
     from array import array
-from hashlib import md5
+from md5 import md5
 import time, random, sys, os
 from random import randint, gauss
 from optparse import OptionParser
@@ -138,17 +138,18 @@
 
 
 class Operation(Thread):
-    def __init__(self, i, counts, latencies):
+    def __init__(self, i, opcounts, keycounts, latencies):
         Thread.__init__(self)
         # generator of the keys to be used
         self.range = xrange(keys_per_thread * i, keys_per_thread * (i + 1))
         # we can't use a local counter, since that won't be visible to the parent
-        # under multiprocessing.  instead, the parent passes a "counts" array
+        # under multiprocessing.  instead, the parent passes a "opcounts" array
         # and an index that is our assigned counter.
         self.idx = i
-        self.counts = counts
-        # similarly, a shared array for latency totals
+        self.opcounts = opcounts
+        # similarly, a shared array for latency and key totals
         self.latencies = latencies
+        self.keycounts = keycounts
         # random host for pseudo-load-balancing
         [hostname] = random.sample(nodes, 1)
         # open client
@@ -180,7 +181,8 @@
                 else:
                     raise
             self.latencies[self.idx] += time.time() - start
-            self.counts[self.idx] += 1
+            self.opcounts[self.idx] += 1
+            self.keycounts[self.idx] += 1
 
 
 class Reader(Operation):
@@ -203,7 +205,8 @@
                         else:
                             raise
                     self.latencies[self.idx] += time.time() - start
-                    self.counts[self.idx] += 1
+                    self.opcounts[self.idx] += 1
+                    self.keycounts[self.idx] += 1
         else:
             parent = ColumnParent('Standard1')
             for i in xrange(keys_per_thread):
@@ -220,7 +223,8 @@
                     else:
                         raise
                 self.latencies[self.idx] += time.time() - start
-                self.counts[self.idx] += 1
+                self.opcounts[self.idx] += 1
+                self.keycounts[self.idx] += 1
 
 class RangeSlicer(Operation):
     def run(self):
@@ -249,9 +253,10 @@
                         else:
                             raise
                     self.latencies[self.idx] += time.time() - begin
-                    self.counts[self.idx] += 1
+                    self.opcounts[self.idx] += 1
                 current += len(r) + 1
-                last += len(r)
+                last += current + options.rangecount
+                self.keycounts[self.idx] += len(r)
         else:
             parent = ColumnParent('Standard1')
             while current < end:
@@ -267,34 +272,37 @@
                     if options.ignore:
                         print e
                     else:
+                        print start, finish
                         raise
                 current += len(r) + 1
-                last += len(r)
+                last += current + options.rangecount
                 self.latencies[self.idx] += time.time() - begin
-                self.counts[self.idx] += 1
+                self.opcounts[self.idx] += 1
+                self.keycounts[self.idx] += len(r)
 
 
 class OperationFactory:
     @staticmethod
-    def create(type, i, counts, latencies):
+    def create(type, i, opcounts, keycounts, latencies):
         if type == 'read':
-            return Reader(i, counts, latencies)
+            return Reader(i, opcounts, keycounts, latencies)
         elif type == 'insert':
-            return Inserter(i, counts, latencies)
+            return Inserter(i, opcounts, keycounts, latencies)
         elif type == 'rangeslice':
-            return RangeSlicer(i, counts, latencies)
+            return RangeSlicer(i, opcounts, keycounts, latencies)
         else:
             raise RuntimeError, 'Unsupported op!'
 
 
 class Stress(object):
-    counts = array('i', [0] * n_threads)
+    opcounts = array('i', [0] * n_threads)
     latencies = array('d', [0] * n_threads)
+    keycounts = array('i', [0] * n_threads)
 
     def create_threads(self,type):
         threads = []
         for i in xrange(n_threads):
-            th = OperationFactory.create(type, i, self.counts, self.latencies)
+            th = OperationFactory.create(type, i, self.opcounts, self.keycounts, self.latencies)
             threads.append(th)
             th.start()
         return threads
@@ -305,19 +313,24 @@
             outf = open(filename,'w')
         else:
             outf = sys.stdout
-        outf.write('total,interval_op_rate,avg_latency,elapsed_time\n')
-        total = old_total = latency = old_latency = 0
+        outf.write('total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time\n')
+        total = old_total = latency = keycount = old_keycount = old_latency = 0
         while True:
             time.sleep(options.interval)
-            old_total, old_latency = total, latency
-            total = sum(self.counts[th.idx] for th in threads)
+            old_total, old_latency, old_keycount = total, latency, keycount
+            total = sum(self.opcounts[th.idx] for th in threads)
             latency = sum(self.latencies[th.idx] for th in threads)
-            delta = total - old_total
+            keycount = sum(self.keycounts[th.idx] for th in threads)
+            opdelta = total - old_total
+            keydelta = keycount - old_keycount
             delta_latency = latency - old_latency
-            delta_formatted = (delta_latency / delta) if delta > 0 else 'NAN'
+            if opdelta > 0:
+                delta_formatted = (delta_latency / opdelta)
+            else:
+                delta_formatted = 'NaN'
             elapsed_t = int(time.time() - start_t)
-            outf.write('%d,%d,%s,%d\n' 
-                       % (total, delta / options.interval, delta_formatted, elapsed_t))
+            outf.write('%d,%d,%d,%s,%d\n' 
+                       % (total, opdelta / options.interval, keydelta / options.interval, delta_formatted, elapsed_t))
             if not [th for th in threads if th.isAlive()]:
                 break