You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/02/24 16:21:28 UTC

git commit: Remove py_stress. Patch by brandonwilliams, reviewed by slebresne for CASSANDRA-3914

Updated Branches:
  refs/heads/cassandra-1.1 ea28f42d7 -> 257d36e30


Remove py_stress.
Patch by brandonwilliams, reviewed by slebresne for CASSANDRA-3914


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/257d36e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/257d36e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/257d36e3

Branch: refs/heads/cassandra-1.1
Commit: 257d36e30fb10f4b85cb2290b3898c8f402f38d0
Parents: ea28f42
Author: Brandon Williams <br...@apache.org>
Authored: Fri Feb 24 09:09:53 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Feb 24 09:09:53 2012 -0600

----------------------------------------------------------------------
 tools/py_stress/README.txt |   67 -----
 tools/py_stress/stress.py  |  522 ---------------------------------------
 2 files changed, 0 insertions(+), 589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/257d36e3/tools/py_stress/README.txt
----------------------------------------------------------------------
diff --git a/tools/py_stress/README.txt b/tools/py_stress/README.txt
deleted file mode 100644
index d7b202a..0000000
--- a/tools/py_stress/README.txt
+++ /dev/null
@@ -1,67 +0,0 @@
-stress.py
-=========
-
-Description
------------
-
-stress.py is a tool for benchmarking and load testing a Cassandra cluster.
-
-Prequisites
------------
-
-Any of the following will work:
-
-    * python2.4 w/multiprocessing
-    * python2.5 w/multiprocessing
-    * python2.6 (multiprocessing is in the stdlib)
-
-You can opt not to use multiprocessing and threads will be used instead, but
-python's GIL will be the limiting factor, not Cassandra, so the results will not be
-accurate.  A warning to this effect will be issued each time you run the program.
-
-Additionally, you will need to generate the thrift bindings for python: run
-'ant gen-thrift-py' in the top-level Cassandra directory.
-
-stress.py will create the keyspace and column families it needs if they do not
-exist during the insert operation.
-
-Usage
------
-
-There are three different modes of operation:
-
-    * inserting (loading test data)
-    * reading
-    * range slicing (only works with the OrderPreservingPartioner)
-    * indexed range slicing (works with RandomParitioner on indexed ColumnFamilies)
-
-Important options:
-    -o or --operation
-        Sets the operation mode, one of 'insert', 'read', 'rangeslice', or 'indexedrangeslice'
-    -n or --num-keys:
-        the number of rows to insert/read/slice 
-    -d or --nodes:
-        the node(s) to perform the test against.  For multiple nodes, supply a
-        comma-separated list without spaces, ex: cassandra1,cassandra2,cassandra3
-    -y or --family-type:
-        Sets the ColumnFamily type.  One of 'regular', or 'super'.  If using super,
-        you probably want to set the -u option also.
-    -c or --columns:
-        the number of columns per row, defaults to 5
-    -u or --supercolumns:
-        use the number of supercolumns specified NOTE: you must set the -y
-        option appropriately, or this option has no effect.
-    -g or --get-range-slice-count:
-        This is only used for the rangeslice operation and will *NOT* work with
-        the RandomPartioner.  You must set the OrderPreservingPartioner in your
-        storage-conf.xml (note that you will need to wipe all existing data
-        when switching partioners.)  This option sets the number of rows to
-        slice at a time and defaults to 1000.
-    -r or --random:
-        Only used for reads.  By default, stress.py will perform reads on rows
-        with a guassian distribution, which will cause some repeats.  Setting
-        this option makes the reads completely random instead.
-    -i or --progress-interval:
-        The interval, in seconds, at which progress will be output.
-
-Remember that you must perform inserts before performing reads or range slices.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/257d36e3/tools/py_stress/stress.py
----------------------------------------------------------------------
diff --git a/tools/py_stress/stress.py b/tools/py_stress/stress.py
deleted file mode 100644
index 319fda3..0000000
--- a/tools/py_stress/stress.py
+++ /dev/null
@@ -1,522 +0,0 @@
-#!/usr/bin/python
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# expects a Cassandra server to be running and listening on port 9160.
-# (read tests expect insert tests to have run first too.)
-
-from __future__ import with_statement
-
-have_multiproc = False
-try:
-    from multiprocessing import Array as array, Process as Thread
-    from uuid import uuid1 as get_ident
-    array('i', 1) # catch "This platform lacks a functioning sem_open implementation"
-    Thread.isAlive = Thread.is_alive
-    have_multiproc = True
-except ImportError:
-    from threading import Thread
-    from thread import get_ident
-    from array import array
-from hashlib import md5
-import time, random, sys, os
-from random import randint, gauss
-from optparse import OptionParser
-
-from thrift.transport import TTransport
-from thrift.transport import TSocket
-from thrift.transport import THttpClient
-from thrift.protocol import TBinaryProtocol
-
-try:
-    from cassandra import Cassandra
-    from cassandra.ttypes import *
-except ImportError:
-    # add cassandra directory to sys.path
-    L = os.path.abspath(__file__).split(os.path.sep)[:-3]
-    root = os.path.sep.join(L)
-    _ipath = os.path.join(root, 'interface', 'thrift', 'gen-py')
-    sys.path.append(os.path.join(_ipath, 'cassandra'))
-    import Cassandra
-    from ttypes import *
-except ImportError:
-    print "Cassandra thrift bindings not found, please run 'ant gen-thrift-py'"
-    sys.exit(2)
-
-try:
-    from thrift.protocol import fastbinary
-except ImportError:
-    print "WARNING: thrift binary extension not found, benchmark will not be accurate!"
-
-parser = OptionParser()
-parser.add_option('-n', '--num-keys', type="int", dest="numkeys",
-                  help="Number of keys", default=1000**2)
-parser.add_option('-N', '--skip-keys', type="float", dest="skipkeys",
-                  help="Fraction of keys to skip initially", default=0)
-parser.add_option('-t', '--threads', type="int", dest="threads",
-                  help="Number of threads/procs to use", default=50)
-parser.add_option('-c', '--columns', type="int", dest="columns",
-                  help="Number of columns per key", default=5)
-parser.add_option('-S', '--column-size', type="int", dest="column_size",
-                  help="Size of column values in bytes", default=34)
-parser.add_option('-C', '--cardinality', type="int", dest="cardinality",
-                  help="Number of unique values stored in columns", default=50)
-parser.add_option('-d', '--nodes', type="string", dest="nodes",
-                  help="Host nodes (comma separated)", default="localhost")
-parser.add_option('-D', '--nodefile', type="string", dest="nodefile",
-                  help="File containing list of nodes (one per line)", default=None)
-parser.add_option('-s', '--stdev', type="float", dest="stdev", default=0.1,
-                  help="standard deviation factor")
-parser.add_option('-r', '--random', action="store_true", dest="random",
-                  help="use random key generator (stdev will have no effect)")
-parser.add_option('-f', '--file', type="string", dest="file", 
-                  help="write output to file")
-parser.add_option('-p', '--port', type="int", default=9160, dest="port",
-                  help="thrift port")
-parser.add_option('-m', '--unframed', action="store_true", dest="unframed",
-                  help="use unframed transport")
-parser.add_option('-o', '--operation', type="choice", dest="operation",
-                  default="insert", choices=('insert', 'read', 'rangeslice',
-                  'indexedrangeslice', 'multiget'),
-                  help="operation to perform")
-parser.add_option('-u', '--supercolumns', type="int", dest="supers", default=1,
-                  help="number of super columns per key")
-parser.add_option('-y', '--family-type', type="choice", dest="cftype",
-                  choices=('regular','super'), default='regular',
-                  help="column family type")
-parser.add_option('-k', '--keep-going', action="store_true", dest="ignore",
-                  help="ignore errors inserting or reading")
-parser.add_option('-i', '--progress-interval', type="int", default=10,
-                  dest="interval", help="progress report interval (seconds)")
-parser.add_option('-g', '--keys-per-call', type="int", default=1000,
-                  dest="rangecount",
-                  help="amount of keys to get_range_slices or multiget per call")
-parser.add_option('-l', '--replication-factor', type="int", default=1,
-                  dest="replication",
-                  help="replication factor to use when creating needed column families")
-parser.add_option('-e', '--consistency-level', type="str", default='ONE',
-                  dest="consistency", help="consistency level to use")
-parser.add_option('-x', '--create-index', type="choice",
-                  choices=('keys','keys_bitmap', 'none'), default='none',
-                  dest="index", help="type of index to create on needed column families")
-
-(options, args) = parser.parse_args()
- 
-total_keys = options.numkeys
-n_threads = options.threads
-keys_per_thread = total_keys / n_threads
-columns_per_key = options.columns
-supers_per_key = options.supers
-# this allows client to round robin requests directly for
-# simple request load-balancing
-nodes = options.nodes.split(',')
-if options.nodefile != None:
-    with open(options.nodefile) as f:
-        nodes = [n.strip() for n in f.readlines() if len(n.strip()) > 0]
-
-#format string for keys
-fmt = '%0' + str(len(str(total_keys))) + 'd'
-
-# a generator that generates all keys according to a bell curve centered
-# around the middle of the keys generated (0..total_keys).  Remember that
-# about 68% of keys will be within stdev away from the mean and 
-# about 95% within 2*stdev.
-stdev = total_keys * options.stdev
-mean = total_keys / 2
-
-consistency = getattr(ConsistencyLevel, options.consistency, None)
-if consistency is None:
-    print "%s is not a valid consistency level" % options.consistency
-    sys.exit(3)
-
-# generates a list of unique, deterministic values
-def generate_values():
-    values = []
-    for i in xrange(0, options.cardinality):
-        h = md5(str(i)).hexdigest()
-        values.append(h * int(options.column_size/len(h)) + h[:options.column_size % len(h)])
-    return values
-
-def key_generator_gauss():
-    while True:
-        guess = gauss(mean, stdev)
-        if 0 <= guess < total_keys:
-            return fmt % int(guess)
-    
-# a generator that will generate all keys w/ equal probability.  this is the
-# worst case for caching.
-def key_generator_random():
-    return fmt % randint(0, total_keys - 1)
-
-key_generator = key_generator_gauss
-if options.random:
-    key_generator = key_generator_random
-
-
-def get_client(host='127.0.0.1', port=9160):
-    socket = TSocket.TSocket(host, port)
-    if options.unframed:
-        transport = TTransport.TBufferedTransport(socket)
-    else:
-        transport = TTransport.TFramedTransport(socket)
-    protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
-    client = Cassandra.Client(protocol)
-    client.transport = transport
-    return client
-
-def make_keyspaces():
-    colms = []
-    if options.index == 'keys':
-        colms = [ColumnDef(name='C1', validation_class='UTF8Type', index_type=IndexType.KEYS)]
-    elif options.index == 'keys_bitmap':
-        colms = [ColumnDef(name='C1', validation_class='UTF8Type', index_type=IndexType.KEYS_BITMAP)]
-    cfams = [CfDef(keyspace='Keyspace1', name='Standard1', column_metadata=colms),
-             CfDef(keyspace='Keyspace1', name='Super1', column_type='Super')]
-    keyspace = KsDef(name='Keyspace1',
-                     strategy_class='org.apache.cassandra.locator.SimpleStrategy',
-                     strategy_options={'replication_factor': str(options.replication)}, 
-                     cf_defs=cfams)
-    client = get_client(nodes[0], options.port)
-    client.transport.open()
-    try:
-        client.system_add_keyspace(keyspace)
-        print "Created keyspaces.  Sleeping %ss for propagation." % len(nodes)
-        time.sleep(len(nodes))
-    except InvalidRequestException, e:
-        print e.why
-    client.transport.close()
-
-class Operation(Thread):
-    def __init__(self, i, opcounts, keycounts, latencies):
-        Thread.__init__(self)
-        # generator of the keys to be used
-        self.range = xrange(int(keys_per_thread * (i + options.skipkeys)), 
-                            keys_per_thread * (i + 1))
-        # we can't use a local counter, since that won't be visible to the parent
-        # under multiprocessing.  instead, the parent passes a "opcounts" array
-        # and an index that is our assigned counter.
-        self.idx = i
-        self.opcounts = opcounts
-        # similarly, a shared array for latency and key totals
-        self.latencies = latencies
-        self.keycounts = keycounts
-        # random host for pseudo-load-balancing
-        [hostname] = random.sample(nodes, 1)
-        # open client
-        self.cclient = get_client(hostname, options.port)
-        self.cclient.transport.open()
-        self.cclient.set_keyspace('Keyspace1')
-
-class Inserter(Operation):
-    def run(self):
-        values = generate_values()
-        columns = [Column('C' + str(j), 'unset', time.time() * 1000000) for j in xrange(columns_per_key)]
-        if 'super' == options.cftype:
-            supers = [SuperColumn('S' + str(j), columns) for j in xrange(supers_per_key)]
-        for i in self.range:
-            key = fmt % i
-            if 'super' == options.cftype:
-                cfmap= {key: {'Super1' : [Mutation(ColumnOrSuperColumn(super_column=s)) for s in supers]}}
-            else:
-                cfmap = {key: {'Standard1': [Mutation(ColumnOrSuperColumn(column=c)) for c in columns]}}
-            # set the correct column values for this row
-            value = values[i % len(values)]
-            for column in columns:
-                column.value = value
-            start = time.time()
-            try:
-                self.cclient.batch_mutate(cfmap, consistency)
-            except KeyboardInterrupt:
-                raise
-            except Exception, e:
-                if options.ignore:
-                    print e
-                else:
-                    raise
-            self.latencies[self.idx] += time.time() - start
-            self.opcounts[self.idx] += 1
-            self.keycounts[self.idx] += 1
-
-
-class Reader(Operation):
-    def run(self):
-        p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
-        if 'super' == options.cftype:
-            for i in xrange(keys_per_thread):
-                key = key_generator()
-                for j in xrange(supers_per_key):
-                    parent = ColumnParent('Super1', 'S' + str(j))
-                    start = time.time()
-                    try:
-                        r = self.cclient.get_slice(key, parent, p, consistency)
-                        if not r: raise RuntimeError("Key %s not found" % key)
-                    except KeyboardInterrupt:
-                        raise
-                    except Exception, e:
-                        if options.ignore:
-                            print e
-                        else:
-                            raise
-                    self.latencies[self.idx] += time.time() - start
-                    self.opcounts[self.idx] += 1
-                    self.keycounts[self.idx] += 1
-        else:
-            parent = ColumnParent('Standard1')
-            for i in xrange(keys_per_thread):
-                key = key_generator()
-                start = time.time()
-                try:
-                    r = self.cclient.get_slice(key, parent, p, consistency)
-                    if not r: raise RuntimeError("Key %s not found" % key)
-                except KeyboardInterrupt:
-                    raise
-                except Exception, e:
-                    if options.ignore:
-                        print e
-                    else:
-                        raise
-                self.latencies[self.idx] += time.time() - start
-                self.opcounts[self.idx] += 1
-                self.keycounts[self.idx] += 1
-
-class RangeSlicer(Operation):
-    def run(self):
-        begin = self.range[0]
-        end = self.range[-1]
-        current = begin
-        last = current + options.rangecount
-        p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
-        if 'super' == options.cftype:
-            while current < end:
-                keyrange = KeyRange(fmt % current, fmt % last, count = options.rangecount)
-                res = []
-                for j in xrange(supers_per_key):
-                    parent = ColumnParent('Super1', 'S' + str(j)) 
-                    begin = time.time()
-                    try:
-                        res = self.cclient.get_range_slices(parent, p, keyrange, consistency)
-                        if not res: raise RuntimeError("Key %s not found" % key)
-                    except KeyboardInterrupt:
-                        raise
-                    except Exception, e:
-                        if options.ignore:
-                            print e
-                        else:
-                            raise
-                    self.latencies[self.idx] += time.time() - begin
-                    self.opcounts[self.idx] += 1
-                current += len(r) + 1
-                last = current + len(r) + 1
-                self.keycounts[self.idx] += len(r)
-        else:
-            parent = ColumnParent('Standard1')
-            while current < end:
-                start = fmt % current 
-                finish = fmt % last
-                keyrange = KeyRange(start, finish, count = options.rangecount)
-                begin = time.time()
-                try:
-                    r = self.cclient.get_range_slices(parent, p, keyrange, consistency)
-                    if not r: raise RuntimeError("Range not found:", start, finish)
-                except KeyboardInterrupt:
-                    raise
-                except Exception, e:
-                    if options.ignore:
-                        print e
-                    else:
-                        print start, finish
-                        raise
-                current += len(r) + 1
-                last = current + len(r)  + 1
-                self.latencies[self.idx] += time.time() - begin
-                self.opcounts[self.idx] += 1
-                self.keycounts[self.idx] += len(r)
-
-# Each thread queries for a portion of the unique values
-# TODO: all threads start at the same key: implement wrapping, and start
-# from the thread's appointed range
-class IndexedRangeSlicer(Operation):
-    def run(self):
-        p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
-        values = generate_values()
-        parent = ColumnParent('Standard1')
-        # the number of rows with a particular value and the number of values we should query for
-        expected_per_value = total_keys // len(values)
-        valuebegin = self.range[0] // expected_per_value
-        valuecount = len(self.range) // expected_per_value
-        for valueidx in xrange(valuebegin, valuebegin + valuecount):
-            received = 0
-            start = fmt % 0
-            value = values[valueidx % len(values)]
-            expressions = [IndexExpression(column_name='C1', op=IndexOperator.EQ, value=value)]
-            while received < expected_per_value:
-                clause = IndexClause(start_key=start, count=options.rangecount, expressions=expressions)
-                begin = time.time()
-                try:
-                    r = self.cclient.get_indexed_slices(parent, clause, p, consistency)
-                    if not r: raise RuntimeError("No indexed values from offset received:", start)
-                except KeyboardInterrupt:
-                    raise
-                except Exception, e:
-                    if options.ignore:
-                        print e
-                        continue
-                    else:
-                        raise
-                received += len(r)
-                # convert max key found back to an integer, and increment it
-                start = fmt % (1 + max([int(keyslice.key) for keyslice in r]))
-                self.latencies[self.idx] += time.time() - begin
-                self.opcounts[self.idx] += 1
-                self.keycounts[self.idx] += len(r)
-
-
-class MultiGetter(Operation):
-    def run(self):
-        p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
-        offset = self.idx * keys_per_thread
-        count = (((self.idx+1) * keys_per_thread) - offset) / options.rangecount
-        if 'super' == options.cftype:
-            for x in xrange(count):
-                keys = [key_generator() for i in xrange(offset, offset + options.rangecount)]
-                for j in xrange(supers_per_key):
-                    parent = ColumnParent('Super1', 'S' + str(j))
-                    start = time.time()
-                    try:
-                        r = self.cclient.multiget_slice(keys, parent, p, consistency)
-                        if not r: raise RuntimeError("Keys %s not found" % keys)
-                    except KeyboardInterrupt:
-                        raise
-                    except Exception, e:
-                        if options.ignore:
-                            print e
-                        else:
-                            raise
-                    self.latencies[self.idx] += time.time() - start
-                    self.opcounts[self.idx] += 1
-                    self.keycounts[self.idx] += len(keys)
-                    offset += options.rangecount
-        else:
-            parent = ColumnParent('Standard1')
-            for x in xrange(count):
-                keys = [key_generator() for i in xrange(offset, offset + options.rangecount)]
-                start = time.time()
-                try:
-                    r = self.cclient.multiget_slice(keys, parent, p, consistency)
-                    if not r: raise RuntimeError("Keys %s not found" % keys)
-                except KeyboardInterrupt:
-                    raise
-                except Exception, e:
-                    if options.ignore:
-                        print e
-                    else:
-                        raise
-                self.latencies[self.idx] += time.time() - start
-                self.opcounts[self.idx] += 1
-                self.keycounts[self.idx] += len(keys)
-                offset += options.rangecount
-
-
-class OperationFactory:
-    @staticmethod
-    def create(type, i, opcounts, keycounts, latencies):
-        if type == 'read':
-            return Reader(i, opcounts, keycounts, latencies)
-        elif type == 'insert':
-            return Inserter(i, opcounts, keycounts, latencies)
-        elif type == 'rangeslice':
-            return RangeSlicer(i, opcounts, keycounts, latencies)
-        elif type == 'indexedrangeslice':
-            return IndexedRangeSlicer(i, opcounts, keycounts, latencies)
-        elif type == 'multiget':
-            return MultiGetter(i, opcounts, keycounts, latencies)
-        else:
-            raise RuntimeError, 'Unsupported op!'
-
-
-class Stress(object):
-    opcounts = array('i', [0] * n_threads)
-    latencies = array('d', [0] * n_threads)
-    keycounts = array('i', [0] * n_threads)
-
-    def create_threads(self,type):
-        threads = []
-        for i in xrange(n_threads):
-            th = OperationFactory.create(type, i, self.opcounts, self.keycounts, self.latencies)
-            threads.append(th)
-            th.start()
-        return threads
-
-    def run_test(self,filename,threads):
-        start_t = time.time()
-        if filename:
-            outf = open(filename,'w')
-        else:
-            outf = sys.stdout
-        outf.write('total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time\n')
-        epoch = total = old_total = latency = keycount = old_keycount = old_latency = 0
-        epoch_intervals = (options.interval * 10) # 1 epoch = 1 tenth of a second
-        terminate = False
-        while not terminate:
-            time.sleep(0.1)
-            if not [th for th in threads if th.isAlive()]:
-                terminate = True
-            epoch = epoch + 1
-            if terminate or epoch > epoch_intervals:
-                epoch = 0
-                old_total, old_latency, old_keycount = total, latency, keycount
-                total = sum(self.opcounts[th.idx] for th in threads)
-                latency = sum(self.latencies[th.idx] for th in threads)
-                keycount = sum(self.keycounts[th.idx] for th in threads)
-                opdelta = total - old_total
-                keydelta = keycount - old_keycount
-                delta_latency = latency - old_latency
-                if opdelta > 0:
-                    delta_formatted = (delta_latency / opdelta)
-                else:
-                    delta_formatted = 'NaN'
-                elapsed_t = int(time.time() - start_t)
-                outf.write('%d,%d,%d,%s,%d\n' 
-                           % (total, opdelta / options.interval, keydelta / options.interval, delta_formatted, elapsed_t))
-
-    def insert(self):
-        threads = self.create_threads('insert')
-        self.run_test(options.file,threads);
-
-    def read(self):
-        threads = self.create_threads('read')
-        self.run_test(options.file,threads);
-        
-    def rangeslice(self):
-        threads = self.create_threads('rangeslice')
-        self.run_test(options.file,threads);
-
-    def indexedrangeslice(self):
-        threads = self.create_threads('indexedrangeslice')
-        self.run_test(options.file,threads);
-
-    def multiget(self):
-        threads = self.create_threads('multiget')
-        self.run_test(options.file,threads);
-
-stresser = Stress()
-benchmark = getattr(stresser, options.operation, None)
-if not have_multiproc:
-    print """WARNING: multiprocessing not present, threading will be used.
-        Benchmark may not be accurate!"""
-if options.operation == 'insert':
-    make_keyspaces()
-benchmark()