You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/02/24 16:21:28 UTC
git commit: Remove py_stress. Patch by brandonwilliams,
reviewed by slebresne for CASSANDRA-3914
Updated Branches:
refs/heads/cassandra-1.1 ea28f42d7 -> 257d36e30
Remove py_stress.
Patch by brandonwilliams, reviewed by slebresne for CASSANDRA-3914
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/257d36e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/257d36e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/257d36e3
Branch: refs/heads/cassandra-1.1
Commit: 257d36e30fb10f4b85cb2290b3898c8f402f38d0
Parents: ea28f42
Author: Brandon Williams <br...@apache.org>
Authored: Fri Feb 24 09:09:53 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Feb 24 09:09:53 2012 -0600
----------------------------------------------------------------------
tools/py_stress/README.txt | 67 -----
tools/py_stress/stress.py | 522 ---------------------------------------
2 files changed, 0 insertions(+), 589 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/257d36e3/tools/py_stress/README.txt
----------------------------------------------------------------------
diff --git a/tools/py_stress/README.txt b/tools/py_stress/README.txt
deleted file mode 100644
index d7b202a..0000000
--- a/tools/py_stress/README.txt
+++ /dev/null
@@ -1,67 +0,0 @@
-stress.py
-=========
-
-Description
------------
-
-stress.py is a tool for benchmarking and load testing a Cassandra cluster.
-
-Prequisites
------------
-
-Any of the following will work:
-
- * python2.4 w/multiprocessing
- * python2.5 w/multiprocessing
- * python2.6 (multiprocessing is in the stdlib)
-
-You can opt not to use multiprocessing and threads will be used instead, but
-python's GIL will be the limiting factor, not Cassandra, so the results will not be
-accurate. A warning to this effect will be issued each time you run the program.
-
-Additionally, you will need to generate the thrift bindings for python: run
-'ant gen-thrift-py' in the top-level Cassandra directory.
-
-stress.py will create the keyspace and column families it needs if they do not
-exist during the insert operation.
-
-Usage
------
-
-There are three different modes of operation:
-
- * inserting (loading test data)
- * reading
- * range slicing (only works with the OrderPreservingPartioner)
- * indexed range slicing (works with RandomParitioner on indexed ColumnFamilies)
-
-Important options:
- -o or --operation
- Sets the operation mode, one of 'insert', 'read', 'rangeslice', or 'indexedrangeslice'
- -n or --num-keys:
- the number of rows to insert/read/slice
- -d or --nodes:
- the node(s) to perform the test against. For multiple nodes, supply a
- comma-separated list without spaces, ex: cassandra1,cassandra2,cassandra3
- -y or --family-type:
- Sets the ColumnFamily type. One of 'regular', or 'super'. If using super,
- you probably want to set the -u option also.
- -c or --columns:
- the number of columns per row, defaults to 5
- -u or --supercolumns:
- use the number of supercolumns specified NOTE: you must set the -y
- option appropriately, or this option has no effect.
- -g or --get-range-slice-count:
- This is only used for the rangeslice operation and will *NOT* work with
- the RandomPartioner. You must set the OrderPreservingPartioner in your
- storage-conf.xml (note that you will need to wipe all existing data
- when switching partioners.) This option sets the number of rows to
- slice at a time and defaults to 1000.
- -r or --random:
- Only used for reads. By default, stress.py will perform reads on rows
- with a guassian distribution, which will cause some repeats. Setting
- this option makes the reads completely random instead.
- -i or --progress-interval:
- The interval, in seconds, at which progress will be output.
-
-Remember that you must perform inserts before performing reads or range slices.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/257d36e3/tools/py_stress/stress.py
----------------------------------------------------------------------
diff --git a/tools/py_stress/stress.py b/tools/py_stress/stress.py
deleted file mode 100644
index 319fda3..0000000
--- a/tools/py_stress/stress.py
+++ /dev/null
@@ -1,522 +0,0 @@
-#!/usr/bin/python
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# expects a Cassandra server to be running and listening on port 9160.
-# (read tests expect insert tests to have run first too.)
-
-from __future__ import with_statement
-
-have_multiproc = False
-try:
- from multiprocessing import Array as array, Process as Thread
- from uuid import uuid1 as get_ident
- array('i', 1) # catch "This platform lacks a functioning sem_open implementation"
- Thread.isAlive = Thread.is_alive
- have_multiproc = True
-except ImportError:
- from threading import Thread
- from thread import get_ident
- from array import array
-from hashlib import md5
-import time, random, sys, os
-from random import randint, gauss
-from optparse import OptionParser
-
-from thrift.transport import TTransport
-from thrift.transport import TSocket
-from thrift.transport import THttpClient
-from thrift.protocol import TBinaryProtocol
-
-try:
- from cassandra import Cassandra
- from cassandra.ttypes import *
-except ImportError:
- # add cassandra directory to sys.path
- L = os.path.abspath(__file__).split(os.path.sep)[:-3]
- root = os.path.sep.join(L)
- _ipath = os.path.join(root, 'interface', 'thrift', 'gen-py')
- sys.path.append(os.path.join(_ipath, 'cassandra'))
- import Cassandra
- from ttypes import *
-except ImportError:
- print "Cassandra thrift bindings not found, please run 'ant gen-thrift-py'"
- sys.exit(2)
-
-try:
- from thrift.protocol import fastbinary
-except ImportError:
- print "WARNING: thrift binary extension not found, benchmark will not be accurate!"
-
-parser = OptionParser()
-parser.add_option('-n', '--num-keys', type="int", dest="numkeys",
- help="Number of keys", default=1000**2)
-parser.add_option('-N', '--skip-keys', type="float", dest="skipkeys",
- help="Fraction of keys to skip initially", default=0)
-parser.add_option('-t', '--threads', type="int", dest="threads",
- help="Number of threads/procs to use", default=50)
-parser.add_option('-c', '--columns', type="int", dest="columns",
- help="Number of columns per key", default=5)
-parser.add_option('-S', '--column-size', type="int", dest="column_size",
- help="Size of column values in bytes", default=34)
-parser.add_option('-C', '--cardinality', type="int", dest="cardinality",
- help="Number of unique values stored in columns", default=50)
-parser.add_option('-d', '--nodes', type="string", dest="nodes",
- help="Host nodes (comma separated)", default="localhost")
-parser.add_option('-D', '--nodefile', type="string", dest="nodefile",
- help="File containing list of nodes (one per line)", default=None)
-parser.add_option('-s', '--stdev', type="float", dest="stdev", default=0.1,
- help="standard deviation factor")
-parser.add_option('-r', '--random', action="store_true", dest="random",
- help="use random key generator (stdev will have no effect)")
-parser.add_option('-f', '--file', type="string", dest="file",
- help="write output to file")
-parser.add_option('-p', '--port', type="int", default=9160, dest="port",
- help="thrift port")
-parser.add_option('-m', '--unframed', action="store_true", dest="unframed",
- help="use unframed transport")
-parser.add_option('-o', '--operation', type="choice", dest="operation",
- default="insert", choices=('insert', 'read', 'rangeslice',
- 'indexedrangeslice', 'multiget'),
- help="operation to perform")
-parser.add_option('-u', '--supercolumns', type="int", dest="supers", default=1,
- help="number of super columns per key")
-parser.add_option('-y', '--family-type', type="choice", dest="cftype",
- choices=('regular','super'), default='regular',
- help="column family type")
-parser.add_option('-k', '--keep-going', action="store_true", dest="ignore",
- help="ignore errors inserting or reading")
-parser.add_option('-i', '--progress-interval', type="int", default=10,
- dest="interval", help="progress report interval (seconds)")
-parser.add_option('-g', '--keys-per-call', type="int", default=1000,
- dest="rangecount",
- help="amount of keys to get_range_slices or multiget per call")
-parser.add_option('-l', '--replication-factor', type="int", default=1,
- dest="replication",
- help="replication factor to use when creating needed column families")
-parser.add_option('-e', '--consistency-level', type="str", default='ONE',
- dest="consistency", help="consistency level to use")
-parser.add_option('-x', '--create-index', type="choice",
- choices=('keys','keys_bitmap', 'none'), default='none',
- dest="index", help="type of index to create on needed column families")
-
-(options, args) = parser.parse_args()
-
-total_keys = options.numkeys
-n_threads = options.threads
-keys_per_thread = total_keys / n_threads
-columns_per_key = options.columns
-supers_per_key = options.supers
-# this allows client to round robin requests directly for
-# simple request load-balancing
-nodes = options.nodes.split(',')
-if options.nodefile != None:
- with open(options.nodefile) as f:
- nodes = [n.strip() for n in f.readlines() if len(n.strip()) > 0]
-
-#format string for keys
-fmt = '%0' + str(len(str(total_keys))) + 'd'
-
-# a generator that generates all keys according to a bell curve centered
-# around the middle of the keys generated (0..total_keys). Remember that
-# about 68% of keys will be within stdev away from the mean and
-# about 95% within 2*stdev.
-stdev = total_keys * options.stdev
-mean = total_keys / 2
-
-consistency = getattr(ConsistencyLevel, options.consistency, None)
-if consistency is None:
- print "%s is not a valid consistency level" % options.consistency
- sys.exit(3)
-
-# generates a list of unique, deterministic values
-def generate_values():
- values = []
- for i in xrange(0, options.cardinality):
- h = md5(str(i)).hexdigest()
- values.append(h * int(options.column_size/len(h)) + h[:options.column_size % len(h)])
- return values
-
-def key_generator_gauss():
- while True:
- guess = gauss(mean, stdev)
- if 0 <= guess < total_keys:
- return fmt % int(guess)
-
-# a generator that will generate all keys w/ equal probability. this is the
-# worst case for caching.
-def key_generator_random():
- return fmt % randint(0, total_keys - 1)
-
-key_generator = key_generator_gauss
-if options.random:
- key_generator = key_generator_random
-
-
-def get_client(host='127.0.0.1', port=9160):
- socket = TSocket.TSocket(host, port)
- if options.unframed:
- transport = TTransport.TBufferedTransport(socket)
- else:
- transport = TTransport.TFramedTransport(socket)
- protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
- client = Cassandra.Client(protocol)
- client.transport = transport
- return client
-
-def make_keyspaces():
- colms = []
- if options.index == 'keys':
- colms = [ColumnDef(name='C1', validation_class='UTF8Type', index_type=IndexType.KEYS)]
- elif options.index == 'keys_bitmap':
- colms = [ColumnDef(name='C1', validation_class='UTF8Type', index_type=IndexType.KEYS_BITMAP)]
- cfams = [CfDef(keyspace='Keyspace1', name='Standard1', column_metadata=colms),
- CfDef(keyspace='Keyspace1', name='Super1', column_type='Super')]
- keyspace = KsDef(name='Keyspace1',
- strategy_class='org.apache.cassandra.locator.SimpleStrategy',
- strategy_options={'replication_factor': str(options.replication)},
- cf_defs=cfams)
- client = get_client(nodes[0], options.port)
- client.transport.open()
- try:
- client.system_add_keyspace(keyspace)
- print "Created keyspaces. Sleeping %ss for propagation." % len(nodes)
- time.sleep(len(nodes))
- except InvalidRequestException, e:
- print e.why
- client.transport.close()
-
-class Operation(Thread):
- def __init__(self, i, opcounts, keycounts, latencies):
- Thread.__init__(self)
- # generator of the keys to be used
- self.range = xrange(int(keys_per_thread * (i + options.skipkeys)),
- keys_per_thread * (i + 1))
- # we can't use a local counter, since that won't be visible to the parent
- # under multiprocessing. instead, the parent passes a "opcounts" array
- # and an index that is our assigned counter.
- self.idx = i
- self.opcounts = opcounts
- # similarly, a shared array for latency and key totals
- self.latencies = latencies
- self.keycounts = keycounts
- # random host for pseudo-load-balancing
- [hostname] = random.sample(nodes, 1)
- # open client
- self.cclient = get_client(hostname, options.port)
- self.cclient.transport.open()
- self.cclient.set_keyspace('Keyspace1')
-
-class Inserter(Operation):
- def run(self):
- values = generate_values()
- columns = [Column('C' + str(j), 'unset', time.time() * 1000000) for j in xrange(columns_per_key)]
- if 'super' == options.cftype:
- supers = [SuperColumn('S' + str(j), columns) for j in xrange(supers_per_key)]
- for i in self.range:
- key = fmt % i
- if 'super' == options.cftype:
- cfmap= {key: {'Super1' : [Mutation(ColumnOrSuperColumn(super_column=s)) for s in supers]}}
- else:
- cfmap = {key: {'Standard1': [Mutation(ColumnOrSuperColumn(column=c)) for c in columns]}}
- # set the correct column values for this row
- value = values[i % len(values)]
- for column in columns:
- column.value = value
- start = time.time()
- try:
- self.cclient.batch_mutate(cfmap, consistency)
- except KeyboardInterrupt:
- raise
- except Exception, e:
- if options.ignore:
- print e
- else:
- raise
- self.latencies[self.idx] += time.time() - start
- self.opcounts[self.idx] += 1
- self.keycounts[self.idx] += 1
-
-
-class Reader(Operation):
- def run(self):
- p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
- if 'super' == options.cftype:
- for i in xrange(keys_per_thread):
- key = key_generator()
- for j in xrange(supers_per_key):
- parent = ColumnParent('Super1', 'S' + str(j))
- start = time.time()
- try:
- r = self.cclient.get_slice(key, parent, p, consistency)
- if not r: raise RuntimeError("Key %s not found" % key)
- except KeyboardInterrupt:
- raise
- except Exception, e:
- if options.ignore:
- print e
- else:
- raise
- self.latencies[self.idx] += time.time() - start
- self.opcounts[self.idx] += 1
- self.keycounts[self.idx] += 1
- else:
- parent = ColumnParent('Standard1')
- for i in xrange(keys_per_thread):
- key = key_generator()
- start = time.time()
- try:
- r = self.cclient.get_slice(key, parent, p, consistency)
- if not r: raise RuntimeError("Key %s not found" % key)
- except KeyboardInterrupt:
- raise
- except Exception, e:
- if options.ignore:
- print e
- else:
- raise
- self.latencies[self.idx] += time.time() - start
- self.opcounts[self.idx] += 1
- self.keycounts[self.idx] += 1
-
-class RangeSlicer(Operation):
- def run(self):
- begin = self.range[0]
- end = self.range[-1]
- current = begin
- last = current + options.rangecount
- p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
- if 'super' == options.cftype:
- while current < end:
- keyrange = KeyRange(fmt % current, fmt % last, count = options.rangecount)
- res = []
- for j in xrange(supers_per_key):
- parent = ColumnParent('Super1', 'S' + str(j))
- begin = time.time()
- try:
- res = self.cclient.get_range_slices(parent, p, keyrange, consistency)
- if not res: raise RuntimeError("Key %s not found" % key)
- except KeyboardInterrupt:
- raise
- except Exception, e:
- if options.ignore:
- print e
- else:
- raise
- self.latencies[self.idx] += time.time() - begin
- self.opcounts[self.idx] += 1
- current += len(r) + 1
- last = current + len(r) + 1
- self.keycounts[self.idx] += len(r)
- else:
- parent = ColumnParent('Standard1')
- while current < end:
- start = fmt % current
- finish = fmt % last
- keyrange = KeyRange(start, finish, count = options.rangecount)
- begin = time.time()
- try:
- r = self.cclient.get_range_slices(parent, p, keyrange, consistency)
- if not r: raise RuntimeError("Range not found:", start, finish)
- except KeyboardInterrupt:
- raise
- except Exception, e:
- if options.ignore:
- print e
- else:
- print start, finish
- raise
- current += len(r) + 1
- last = current + len(r) + 1
- self.latencies[self.idx] += time.time() - begin
- self.opcounts[self.idx] += 1
- self.keycounts[self.idx] += len(r)
-
-# Each thread queries for a portion of the unique values
-# TODO: all threads start at the same key: implement wrapping, and start
-# from the thread's appointed range
-class IndexedRangeSlicer(Operation):
- def run(self):
- p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
- values = generate_values()
- parent = ColumnParent('Standard1')
- # the number of rows with a particular value and the number of values we should query for
- expected_per_value = total_keys // len(values)
- valuebegin = self.range[0] // expected_per_value
- valuecount = len(self.range) // expected_per_value
- for valueidx in xrange(valuebegin, valuebegin + valuecount):
- received = 0
- start = fmt % 0
- value = values[valueidx % len(values)]
- expressions = [IndexExpression(column_name='C1', op=IndexOperator.EQ, value=value)]
- while received < expected_per_value:
- clause = IndexClause(start_key=start, count=options.rangecount, expressions=expressions)
- begin = time.time()
- try:
- r = self.cclient.get_indexed_slices(parent, clause, p, consistency)
- if not r: raise RuntimeError("No indexed values from offset received:", start)
- except KeyboardInterrupt:
- raise
- except Exception, e:
- if options.ignore:
- print e
- continue
- else:
- raise
- received += len(r)
- # convert max key found back to an integer, and increment it
- start = fmt % (1 + max([int(keyslice.key) for keyslice in r]))
- self.latencies[self.idx] += time.time() - begin
- self.opcounts[self.idx] += 1
- self.keycounts[self.idx] += len(r)
-
-
-class MultiGetter(Operation):
- def run(self):
- p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
- offset = self.idx * keys_per_thread
- count = (((self.idx+1) * keys_per_thread) - offset) / options.rangecount
- if 'super' == options.cftype:
- for x in xrange(count):
- keys = [key_generator() for i in xrange(offset, offset + options.rangecount)]
- for j in xrange(supers_per_key):
- parent = ColumnParent('Super1', 'S' + str(j))
- start = time.time()
- try:
- r = self.cclient.multiget_slice(keys, parent, p, consistency)
- if not r: raise RuntimeError("Keys %s not found" % keys)
- except KeyboardInterrupt:
- raise
- except Exception, e:
- if options.ignore:
- print e
- else:
- raise
- self.latencies[self.idx] += time.time() - start
- self.opcounts[self.idx] += 1
- self.keycounts[self.idx] += len(keys)
- offset += options.rangecount
- else:
- parent = ColumnParent('Standard1')
- for x in xrange(count):
- keys = [key_generator() for i in xrange(offset, offset + options.rangecount)]
- start = time.time()
- try:
- r = self.cclient.multiget_slice(keys, parent, p, consistency)
- if not r: raise RuntimeError("Keys %s not found" % keys)
- except KeyboardInterrupt:
- raise
- except Exception, e:
- if options.ignore:
- print e
- else:
- raise
- self.latencies[self.idx] += time.time() - start
- self.opcounts[self.idx] += 1
- self.keycounts[self.idx] += len(keys)
- offset += options.rangecount
-
-
-class OperationFactory:
- @staticmethod
- def create(type, i, opcounts, keycounts, latencies):
- if type == 'read':
- return Reader(i, opcounts, keycounts, latencies)
- elif type == 'insert':
- return Inserter(i, opcounts, keycounts, latencies)
- elif type == 'rangeslice':
- return RangeSlicer(i, opcounts, keycounts, latencies)
- elif type == 'indexedrangeslice':
- return IndexedRangeSlicer(i, opcounts, keycounts, latencies)
- elif type == 'multiget':
- return MultiGetter(i, opcounts, keycounts, latencies)
- else:
- raise RuntimeError, 'Unsupported op!'
-
-
-class Stress(object):
- opcounts = array('i', [0] * n_threads)
- latencies = array('d', [0] * n_threads)
- keycounts = array('i', [0] * n_threads)
-
- def create_threads(self,type):
- threads = []
- for i in xrange(n_threads):
- th = OperationFactory.create(type, i, self.opcounts, self.keycounts, self.latencies)
- threads.append(th)
- th.start()
- return threads
-
- def run_test(self,filename,threads):
- start_t = time.time()
- if filename:
- outf = open(filename,'w')
- else:
- outf = sys.stdout
- outf.write('total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time\n')
- epoch = total = old_total = latency = keycount = old_keycount = old_latency = 0
- epoch_intervals = (options.interval * 10) # 1 epoch = 1 tenth of a second
- terminate = False
- while not terminate:
- time.sleep(0.1)
- if not [th for th in threads if th.isAlive()]:
- terminate = True
- epoch = epoch + 1
- if terminate or epoch > epoch_intervals:
- epoch = 0
- old_total, old_latency, old_keycount = total, latency, keycount
- total = sum(self.opcounts[th.idx] for th in threads)
- latency = sum(self.latencies[th.idx] for th in threads)
- keycount = sum(self.keycounts[th.idx] for th in threads)
- opdelta = total - old_total
- keydelta = keycount - old_keycount
- delta_latency = latency - old_latency
- if opdelta > 0:
- delta_formatted = (delta_latency / opdelta)
- else:
- delta_formatted = 'NaN'
- elapsed_t = int(time.time() - start_t)
- outf.write('%d,%d,%d,%s,%d\n'
- % (total, opdelta / options.interval, keydelta / options.interval, delta_formatted, elapsed_t))
-
- def insert(self):
- threads = self.create_threads('insert')
- self.run_test(options.file,threads);
-
- def read(self):
- threads = self.create_threads('read')
- self.run_test(options.file,threads);
-
- def rangeslice(self):
- threads = self.create_threads('rangeslice')
- self.run_test(options.file,threads);
-
- def indexedrangeslice(self):
- threads = self.create_threads('indexedrangeslice')
- self.run_test(options.file,threads);
-
- def multiget(self):
- threads = self.create_threads('multiget')
- self.run_test(options.file,threads);
-
-stresser = Stress()
-benchmark = getattr(stresser, options.operation, None)
-if not have_multiproc:
- print """WARNING: multiprocessing not present, threading will be used.
- Benchmark may not be accurate!"""
-if options.operation == 'insert':
- make_keyspaces()
-benchmark()