You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/15 22:45:48 UTC

svn commit: r891010 - in /incubator/cassandra/trunk: contrib/py_stress/ contrib/py_stress/stress.py test/system/stress.py

Author: jbellis
Date: Tue Dec 15 21:45:47 2009
New Revision: 891010

URL: http://svn.apache.org/viewvc?rev=891010&view=rev
Log:
make stress.py standalone and move to contrib/.  patch by Brandon Williams; reviewed by jbellis for CASSANDRA-635

Added:
    incubator/cassandra/trunk/contrib/py_stress/
    incubator/cassandra/trunk/contrib/py_stress/stress.py   (with props)
Removed:
    incubator/cassandra/trunk/test/system/stress.py

Added: incubator/cassandra/trunk/contrib/py_stress/stress.py
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/py_stress/stress.py?rev=891010&view=auto
==============================================================================
--- incubator/cassandra/trunk/contrib/py_stress/stress.py (added)
+++ incubator/cassandra/trunk/contrib/py_stress/stress.py Tue Dec 15 21:45:47 2009
@@ -0,0 +1,205 @@
+#!/usr/bin/python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# expects a Cassandra server to be running and listening on port 9160.
+# (read tests expect insert tests to have run first too.)
+
+have_multiproc = False
+try:
+    from multiprocessing import Array as array, Process as Thread
+    from uuid import uuid1 as get_ident
+    Thread.isAlive = Thread.is_alive
+    have_multiproc = True
+except ImportError:
+    from threading import Thread
+    from thread import get_ident
+    from array import array
+from hashlib import md5
+import time, random, sys, os
+from random import randint, gauss
+from optparse import OptionParser
+
+from thrift.transport import TTransport
+from thrift.transport import TSocket
+from thrift.transport import THttpClient
+from thrift.protocol import TBinaryProtocol
+
+try:
+    from cassandra import Cassandra
+    from Cassandra.ttypes import *
+except ImportError:
+    # add cassandra directory to sys.path
+    L = os.path.abspath(__file__).split(os.path.sep)[:-3]
+    root = os.path.sep.join(L)
+    _ipath = os.path.join(root, 'interface', 'gen-py')
+    sys.path.append(os.path.join(_ipath, 'cassandra'))
+    import Cassandra
+    from ttypes import *
+except ImportError:
+    print "Cassandra thrift bindings not found, please run 'ant gen-thrift-py'"
+    sys.exit(2)
+
+
+parser = OptionParser()
+parser.add_option('-n', '--num-keys', type="int", dest="numkeys",
+                  help="Number of keys", default=1000**2)
+parser.add_option('-t', '--threads', type="int", dest="threads",
+                  help="Number of threads/procs to use", default=50)
+parser.add_option('-c', '--columns', type="int", dest="columns",
+                  help="Number of columns per key", default=5)
+parser.add_option('-d', '--nodes', type="string", dest="nodes",
+                  help="Host nodes (comma separated)", default="localhost")
+parser.add_option('-s', '--stdev', type="int", dest="stdev", default=0.3,
+                  help="standard deviation factor")
+parser.add_option('-r', '--random', action="store_true", dest="random",
+                  help="use random key generator (stdev will have no effect)")
+parser.add_option('-f', '--file', type="string", dest="file", 
+                  help="write output to file")
+parser.add_option('-p', '--port', type="int", default=9160, dest="port",
+                  help="thrift port")
+parser.add_option('-m', '--framed', action="store_true", dest="framed",
+                  help="use framed transport")
+parser.add_option('-o', '--operation', type="choice", dest="operation",
+                  default="insert", choices=('insert', 'read'),
+                  help="operation to perform")
+
+(options, args) = parser.parse_args()
+ 
+total_keys = options.numkeys
+n_threads = options.threads
+keys_per_thread = total_keys / n_threads
+columns_per_key = options.columns
+# this allows client to round robin requests directly for
+# simple request load-balancing
+nodes = options.nodes.split(',')
+
+# a generator that generates all keys according to a bell curve centered
+# around the middle of the keys generated (0..total_keys).  Remember that
+# about 68% of keys will be within stdev away from the mean and 
+# about 95% within 2*stdev.
+stdev = total_keys * options.stdev
+mean = total_keys / 2
+
+def get_client(host='127.0.0.1', port=9160, framed=False):
+    socket = TSocket.TSocket(host, port)
+    if framed:
+        transport = TTransport.TFramedTransport(socket)
+    else:
+        transport = TTransport.TBufferedTransport(socket)
+    protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
+    client = Cassandra.Client(protocol)
+    client.transport = transport
+    return client
+
+def key_generator_gauss():
+    while True:
+        guess = gauss(mean, stdev)
+        if 0 <= guess < total_keys:
+            return int(guess)
+    
+# a generator that will generate all keys w/ equal probability.  this is the
+# worst case for caching.
+key_generator_random = lambda: randint(0, total_keys - 1)
+
+key_generator = key_generator_gauss
+if options.random:
+    key_generator = key_generator_random
+
+class Operation(Thread):
+    def __init__(self, i, counts):
+        Thread.__init__(self)
+        self.range = xrange(keys_per_thread * i, keys_per_thread * (i + 1))
+        self.idx = i
+        self.counts = counts
+        [hostname] = random.sample(nodes, 1)
+        self.cclient = get_client(host=hostname,port=options.port,
+                                  framed=options.framed)
+        self.cclient.transport.open()
+
+class Inserter(Operation):
+    def run(self):
+        data = md5(str(get_ident())).hexdigest()
+        columns = [Column(chr(ord('A') + j), data, 0) for j in xrange(columns_per_key)]
+        for i in self.range:
+            key = str(i)
+            cfmap = {'Standard1': [ColumnOrSuperColumn(column=c) for c in columns]}
+            self.cclient.batch_insert('Keyspace1', key, cfmap, ConsistencyLevel.ONE)
+            self.counts[self.idx]=self.counts[self.idx]+1
+
+class Reader(Operation):
+    def run(self):
+        parent = ColumnParent('Standard1')
+        p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
+        for i in xrange(keys_per_thread):
+            key = str(key_generator())
+            self.cclient.get_slice('Keyspace1', key, parent, p, ConsistencyLevel.ONE)
+            self.counts[self.idx]=self.counts[self.idx]+1
+
+class OperationFactory:
+    @staticmethod
+    def create(type,i,counts):
+        if type == 'read':
+            return Reader(i, counts)
+        elif type == 'insert':
+            return Inserter(i, counts)
+        else:
+            raise RuntimeError, 'Unsupported op!'
+
+class Stress(object):
+    counts = array('i', [0]*n_threads)
+
+    def create_threads(self,type):
+        threads = []
+        for i in xrange(n_threads):
+            th = OperationFactory.create(type,i, self.counts)
+            threads.append(th)
+            th.start()
+        return threads
+
+    def run_test(self,filename,threads):
+        start_t = time.time()
+        if filename:
+            outf = open(filename,'w')
+        else:
+            outf = sys.stdout
+        outf.write('total,interval_op_rate,elapsed_time\n')
+        total = old_total = 0
+        while True:
+            interval = 10
+            time.sleep(interval)
+            old_total = total
+            total = sum(self.counts[th.idx] for th in threads)
+            delta = total - old_total
+            elapsed_t = int(time.time()-start_t)
+            outf.write('%d,%d,%d\n' % (total, delta / interval,elapsed_t))
+            if not [th for th in threads if th.isAlive()]:
+                break
+
+    def insert(self):
+        threads = self.create_threads('insert')
+        self.run_test(options.file,threads);
+
+    def read(self):
+        threads = self.create_threads('read')
+        self.run_test(options.file,threads);
+        
+stresser = Stress()
+benchmark = getattr(stresser, options.operation, None)
+if not have_multiproc:
+    print """WARNING: multiprocessing not present, threading will be used.
+        Benchmark may not be accurate!"""
+benchmark()

Propchange: incubator/cassandra/trunk/contrib/py_stress/stress.py
------------------------------------------------------------------------------
    svn:eol-style = native