You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/07/07 19:34:43 UTC

[20/23] git commit: Introduce CQL support for stress tool

Introduce CQL support for stress tool

patch by Jake Luciani and Benedict Elliott Smith


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/75364296
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/75364296
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/75364296

Branch: refs/heads/cassandra-2.1.0
Commit: 75364296cb3749dfa6d4307cf055f1a18bcf7a9b
Parents: 61ee287
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Jul 7 18:03:59 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Jul 7 18:25:11 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/thrift/ThriftConversion.java      |  19 +
 .../org/apache/cassandra/utils/UUIDGen.java     |   8 +
 tools/bin/cassandra-stress                      |   2 +-
 tools/cqlstress-counter-example.yaml            |  85 +++
 tools/cqlstress-example.yaml                    |  99 +++
 tools/cqlstress-insanity-example.yaml           | 102 +++
 .../org/apache/cassandra/stress/Operation.java  | 218 +-----
 .../apache/cassandra/stress/StressAction.java   | 262 +++----
 .../apache/cassandra/stress/StressMetrics.java  |  17 +-
 .../apache/cassandra/stress/StressProfile.java  | 504 +++++++++++++
 .../org/apache/cassandra/stress/StressYaml.java |  38 +
 .../cassandra/stress/generate/Distribution.java |  57 ++
 .../generate/DistributionBoundApache.java       |  84 +++
 .../stress/generate/DistributionFactory.java    |  31 +
 .../stress/generate/DistributionFixed.java      |  54 ++
 .../stress/generate/DistributionInverted.java   |  37 +
 .../generate/DistributionOffsetApache.java      |  80 +++
 .../cassandra/stress/generate/Partition.java    | 343 +++++++++
 .../stress/generate/PartitionGenerator.java     |  80 +++
 .../stress/generate/RatioDistribution.java      |  25 +
 .../generate/RatioDistributionFactory.java      |  31 +
 .../apache/cassandra/stress/generate/Row.java   |  22 +
 .../stress/generate/SeedGenerator.java          |   8 +
 .../stress/generate/SeedRandomGenerator.java    |  33 +
 .../stress/generate/SeedSeriesGenerator.java    |  21 +
 .../stress/generate/values/Booleans.java        |  37 +
 .../cassandra/stress/generate/values/Bytes.java |  54 ++
 .../cassandra/stress/generate/values/Dates.java |  46 ++
 .../stress/generate/values/Doubles.java         |  37 +
 .../stress/generate/values/Floats.java          |  37 +
 .../stress/generate/values/Generator.java       |  50 ++
 .../stress/generate/values/GeneratorConfig.java |  68 ++
 .../stress/generate/values/HexBytes.java        |  56 ++
 .../stress/generate/values/HexStrings.java      |  55 ++
 .../cassandra/stress/generate/values/Inets.java |  57 ++
 .../stress/generate/values/Integers.java        |  38 +
 .../cassandra/stress/generate/values/Lists.java |  55 ++
 .../cassandra/stress/generate/values/Longs.java |  37 +
 .../cassandra/stress/generate/values/Sets.java  |  54 ++
 .../stress/generate/values/Strings.java         |  49 ++
 .../stress/generate/values/TimeUUIDs.java       |  51 ++
 .../cassandra/stress/generate/values/UUIDs.java |  39 +
 .../cassandra/stress/generatedata/DataGen.java  |  39 -
 .../stress/generatedata/DataGenBytesRandom.java |  45 --
 .../stress/generatedata/DataGenFactory.java     |  30 -
 .../stress/generatedata/DataGenHex.java         |  60 --
 .../DataGenHexFromDistribution.java             |  66 --
 .../generatedata/DataGenHexFromOpIndex.java     |  48 --
 .../generatedata/DataGenStringDictionary.java   | 107 ---
 .../generatedata/DataGenStringRepeats.java      |  90 ---
 .../stress/generatedata/Distribution.java       |  40 --
 .../generatedata/DistributionBoundApache.java   |  63 --
 .../generatedata/DistributionFactory.java       |  31 -
 .../stress/generatedata/DistributionFixed.java  |  46 --
 .../generatedata/DistributionOffsetApache.java  |  61 --
 .../generatedata/DistributionSeqBatch.java      |  68 --
 .../cassandra/stress/generatedata/KeyGen.java   |  54 --
 .../cassandra/stress/generatedata/RowGen.java   |  53 --
 .../generatedata/RowGenDistributedSize.java     | 116 ---
 .../stress/operations/CqlCounterAdder.java      |  77 --
 .../stress/operations/CqlCounterGetter.java     |  69 --
 .../operations/CqlIndexedRangeSlicer.java       | 118 ---
 .../stress/operations/CqlInserter.java          |  88 ---
 .../stress/operations/CqlMultiGetter.java       |  42 --
 .../stress/operations/CqlOperation.java         | 698 ------------------
 .../stress/operations/CqlRangeSlicer.java       |  59 --
 .../cassandra/stress/operations/CqlReader.java  |  91 ---
 .../stress/operations/FixedOpDistribution.java  |  25 +
 .../stress/operations/OpDistribution.java       |  11 +
 .../operations/OpDistributionFactory.java       |  12 +
 .../operations/SampledOpDistribution.java       |  41 ++
 .../SampledOpDistributionFactory.java           |  72 ++
 .../stress/operations/ThriftCounterAdder.java   |  95 ---
 .../stress/operations/ThriftCounterGetter.java  |  68 --
 .../operations/ThriftIndexedRangeSlicer.java    | 114 ---
 .../stress/operations/ThriftInserter.java       | 117 ---
 .../stress/operations/ThriftMultiGetter.java    |  80 ---
 .../stress/operations/ThriftRangeSlicer.java    |  85 ---
 .../stress/operations/ThriftReader.java         |  94 ---
 .../operations/predefined/CqlCounterAdder.java  |  84 +++
 .../operations/predefined/CqlCounterGetter.java |  74 ++
 .../operations/predefined/CqlInserter.java      |  79 ++
 .../operations/predefined/CqlOperation.java     | 714 +++++++++++++++++++
 .../stress/operations/predefined/CqlReader.java |  87 +++
 .../predefined/PredefinedOperation.java         | 248 +++++++
 .../predefined/ThriftCounterAdder.java          |  86 +++
 .../predefined/ThriftCounterGetter.java         |  66 ++
 .../operations/predefined/ThriftInserter.java   |  96 +++
 .../operations/predefined/ThriftReader.java     |  79 ++
 .../operations/userdefined/SchemaInsert.java    | 144 ++++
 .../operations/userdefined/SchemaQuery.java     |  86 +++
 .../operations/userdefined/SchemaStatement.java | 164 +++++
 .../cassandra/stress/settings/Command.java      |  24 +-
 .../stress/settings/CommandCategory.java        |   4 +-
 .../stress/settings/OptionAnyProbabilities.java |  78 ++
 .../stress/settings/OptionDataGen.java          | 203 ------
 .../stress/settings/OptionDistribution.java     |  75 +-
 .../settings/OptionEnumProbabilities.java       |  62 ++
 .../cassandra/stress/settings/OptionMulti.java  |   3 +-
 .../settings/OptionRatioDistribution.java       | 166 +++++
 .../cassandra/stress/settings/OptionSimple.java |   5 +-
 .../stress/settings/SettingsColumn.java         |  50 +-
 .../stress/settings/SettingsCommand.java        |  58 +-
 .../stress/settings/SettingsCommandMixed.java   | 207 ------
 .../settings/SettingsCommandPreDefined.java     | 145 ++++
 .../SettingsCommandPreDefinedMixed.java         | 151 ++++
 .../stress/settings/SettingsCommandUser.java    | 135 ++++
 .../cassandra/stress/settings/SettingsKey.java  |  27 +-
 .../cassandra/stress/settings/SettingsMisc.java |   2 +-
 .../cassandra/stress/settings/SettingsMode.java |   6 +-
 .../stress/settings/SettingsSchema.java         |  77 +-
 .../stress/settings/SettingsTransport.java      |   2 +-
 .../stress/settings/StressSettings.java         |  36 +-
 .../stress/settings/ValidationType.java         |   8 +
 .../cassandra/stress/util/JavaDriverClient.java |  19 +-
 .../stress/util/SmartThriftClient.java          |   4 +-
 .../org/apache/cassandra/stress/util/Timer.java |  17 +-
 .../cassandra/stress/util/TimingInterval.java   |  26 +-
 119 files changed, 5861 insertions(+), 3890 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 491ad6d..ff2f586 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.0-rc3
+ * Introduce CQL support for stress tool (CASSANDRA-6146)
  * Fix ClassCastException processing expired messages (CASSANDRA-7496)
  * Fix prepared marker for collections inside UDT (CASSANDRA-7472)
  * Remove left-over populate_io_cache_on_flush and replicate_on_write

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 9573b56..0c75d2c 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -51,6 +51,25 @@ public class ThriftConversion
         throw new AssertionError();
     }
 
+    public static ConsistencyLevel toThrift(org.apache.cassandra.db.ConsistencyLevel cl)
+    {
+        switch (cl)
+        {
+            case ANY: return ConsistencyLevel.ANY;
+            case ONE: return ConsistencyLevel.ONE;
+            case TWO: return ConsistencyLevel.TWO;
+            case THREE: return ConsistencyLevel.THREE;
+            case QUORUM: return ConsistencyLevel.QUORUM;
+            case ALL: return ConsistencyLevel.ALL;
+            case LOCAL_QUORUM: return ConsistencyLevel.LOCAL_QUORUM;
+            case EACH_QUORUM: return ConsistencyLevel.EACH_QUORUM;
+            case SERIAL: return ConsistencyLevel.SERIAL;
+            case LOCAL_SERIAL: return ConsistencyLevel.LOCAL_SERIAL;
+            case LOCAL_ONE: return ConsistencyLevel.LOCAL_ONE;
+        }
+        throw new AssertionError();
+    }
+
     // We never return, but returning a RuntimeException allows to write "throw rethrow(e)" without java complaining
     // for methods that have a return value.
     public static RuntimeException rethrow(RequestExecutionException e) throws UnavailableException, TimedOutException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/src/java/org/apache/cassandra/utils/UUIDGen.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java
index f385744..53293b2 100644
--- a/src/java/org/apache/cassandra/utils/UUIDGen.java
+++ b/src/java/org/apache/cassandra/utils/UUIDGen.java
@@ -25,6 +25,8 @@ import java.util.Collection;
 import java.util.Random;
 import java.util.UUID;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 /**
  * The goods are here: www.ietf.org/rfc/rfc4122.txt.
@@ -80,6 +82,12 @@ public class UUIDGen
         return new UUID(createTime(fromUnixTimestamp(when)), clockSeqAndNode);
     }
 
+    @VisibleForTesting
+    public static UUID getTimeUUID(long when, long clockSeqAndNode)
+    {
+        return new UUID(createTime(fromUnixTimestamp(when)), clockSeqAndNode);
+    }
+
     /** creates a type 1 uuid from raw bytes. */
     public static UUID getUUID(ByteBuffer raw)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/bin/cassandra-stress
----------------------------------------------------------------------
diff --git a/tools/bin/cassandra-stress b/tools/bin/cassandra-stress
index 39257cd..c855cf5 100755
--- a/tools/bin/cassandra-stress
+++ b/tools/bin/cassandra-stress
@@ -42,4 +42,4 @@ if [ "x$JAVA" = "x" ]; then
     exit 1
 fi
 
-$JAVA -server -cp $CLASSPATH org.apache.cassandra.stress.Stress $@
+$JAVA -server -ea -cp $CLASSPATH org.apache.cassandra.stress.Stress $@

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/cqlstress-counter-example.yaml
----------------------------------------------------------------------
diff --git a/tools/cqlstress-counter-example.yaml b/tools/cqlstress-counter-example.yaml
new file mode 100644
index 0000000..a65080a
--- /dev/null
+++ b/tools/cqlstress-counter-example.yaml
@@ -0,0 +1,85 @@
+#
+# This is an example YAML profile for cassandra-stress
+#
+# insert data
+# cassandra-stress user profile=/home/jake/stress1.yaml ops(insert=1)
+#
+# read, using query simple1:
+# cassandra-stress profile=/home/jake/stress1.yaml ops(simple1=1)
+#
+# mixed workload (90/10)
+# cassandra-stress user profile=/home/jake/stress1.yaml ops(insert=1,simple1=9)
+
+
+#
+# Keyspace info
+#
+keyspace: stresscql
+
+#
+# The CQL for creating a keyspace (optional if it already exists)
+#
+keyspace_definition: |
+  CREATE KEYSPACE stresscql WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+
+#
+# Table info
+#
+table: counttest
+
+#
+# The CQL for creating a table you wish to stress (optional if it already exists)
+#
+table_definition: |
+  CREATE TABLE counttest (
+        name text PRIMARY KEY,
+        count counter
+  ) WITH comment='A table of many types to test wide rows'
+
+#
+# Optional meta information on the generated columns in the above table
+# The min and max only apply to text and blob types
+# The distribution field represents the total unique population
+# distribution of that column across rows.  Supported types are
+# 
+#      EXP(min..max)                        An exponential distribution over the range [min..max]
+#      EXTREME(min..max,shape)              An extreme value (Weibull) distribution over the range [min..max]
+#      GAUSSIAN(min..max,stdvrng)           A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
+#      GAUSSIAN(min..max,mean,stdev)        A gaussian/normal distribution, with explicitly defined mean and stdev
+#      UNIFORM(min..max)                    A uniform distribution over the range [min, max]
+#      FIXED(val)                           A fixed distribution, always returning the same value
+#      Aliases: extr, gauss, normal, norm, weibull
+#
+#      If preceded by ~, the distribution is inverted
+#
+# Defaults for all columns are size: uniform(1..256), identity: uniform(1..1024)
+#
+
+columnspec:
+  - name: name
+    clustering: uniform(1..100)
+    size: uniform(1..4)
+  - name: count
+    population: fixed(1)
+
+insert:
+  partitions: fixed(1)            # number of unique partitions to update in a single operation
+                                  # if perbatch < 1, multiple batches will be used but all partitions will
+                                  # occur in all batches (unless already finished); only the row counts will vary
+  pervisit: fixed(1)/1            # ratio of rows each partition should update in a single visit to the partition,
+                                  # as a proportion of the total possible for the partition
+  perbatch: fixed(1)/1            # number of rows each partition should update in a single batch statement,
+                                  # as a proportion of the proportion we are inserting this visit
+                                  # (i.e. compounds with (and capped by) pervisit)
+  batchtype: UNLOGGED             # type of batch to use
+
+#
+# A list of queries you wish to run against the schema
+#
+queries:
+   simple1: select * from counttest where name = ?
+
+#
+# In order to generate data consistently we need something to generate a unique key for this schema profile.
+#
+seed: changing this string changes the generated data. its hashcode is used as the random seed.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/cqlstress-example.yaml
----------------------------------------------------------------------
diff --git a/tools/cqlstress-example.yaml b/tools/cqlstress-example.yaml
new file mode 100644
index 0000000..a997529
--- /dev/null
+++ b/tools/cqlstress-example.yaml
@@ -0,0 +1,99 @@
+#
+# This is an example YAML profile for cassandra-stress
+#
+# insert data
+# cassandra-stress user profile=/home/jake/stress1.yaml ops(insert=1)
+#
+# read, using query simple1:
+# cassandra-stress profile=/home/jake/stress1.yaml ops(simple1=1)
+#
+# mixed workload (90/10)
+# cassandra-stress user profile=/home/jake/stress1.yaml ops(insert=1,simple1=9)
+
+
+#
+# Keyspace info
+#
+keyspace: stresscql
+
+#
+# The CQL for creating a keyspace (optional if it already exists)
+#
+keyspace_definition: |
+  CREATE KEYSPACE stresscql WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+
+#
+# Table info
+#
+table: typestest
+
+#
+# The CQL for creating a table you wish to stress (optional if it already exists)
+#
+table_definition: |
+  CREATE TABLE typestest (
+        name text,
+        choice boolean,
+        date timestamp,
+        address inet,
+        dbl double,
+        lval bigint,
+        ival int,
+        uid timeuuid,
+        value blob,
+        PRIMARY KEY((name,choice), date, address, dbl, lval, ival, uid)
+  ) WITH COMPACT STORAGE 
+    AND compaction = { 'class':'LeveledCompactionStrategy' }
+    AND comment='A table of many types to test wide rows'
+
+#
+# Optional meta information on the generated columns in the above table
+# The min and max only apply to text and blob types
+# The distribution field represents the total unique population
+# distribution of that column across rows.  Supported types are
+# 
+#      EXP(min..max)                        An exponential distribution over the range [min..max]
+#      EXTREME(min..max,shape)              An extreme value (Weibull) distribution over the range [min..max]
+#      GAUSSIAN(min..max,stdvrng)           A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
+#      GAUSSIAN(min..max,mean,stdev)        A gaussian/normal distribution, with explicitly defined mean and stdev
+#      UNIFORM(min..max)                    A uniform distribution over the range [min, max]
+#      FIXED(val)                           A fixed distribution, always returning the same value
+#      Aliases: extr, gauss, normal, norm, weibull
+#
+#      If preceded by ~, the distribution is inverted
+#
+# Defaults for all columns are size: uniform(1..256), identity: uniform(1..1024)
+#
+columnspec:
+  - name: name
+    size: uniform(1..10)
+    population: uniform(1..1M)     # the range of unique values to select for the field (default is 100Billion)
+  - name: choice
+  - name: date
+    cluster: uniform(1..4)
+  - name: lval
+    population: gaussian(1..1000)
+    cluster: uniform(1..4)
+
+insert:
+  partitions: uniform(1..50)      # number of unique partitions to update in a single operation
+                                  # if perbatch < 1, multiple batches will be used but all partitions will
+                                  # occur in all batches (unless already finished); only the row counts will vary
+  pervisit: uniform(1..10)/10     # ratio of rows each partition should update in a single visit to the partition,
+                                  # as a proportion of the total possible for the partition
+  perbatch: ~exp(1..3)/4          # number of rows each partition should update in a single batch statement,
+                                  # as a proportion of the proportion we are inserting this visit
+                                  # (i.e. compounds with (and capped by) pervisit)
+  batchtype: UNLOGGED             # type of batch to use
+
+#
+# A list of queries you wish to run against the schema
+#
+queries:
+   simple1: select * from typestest where name = ? and choice = ? LIMIT 100
+   range1: select * from typestest where name = ? and choice = ? and date >= ? LIMIT 100
+
+#
+# In order to generate data consistently we need something to generate a unique key for this schema profile.
+#
+seed: changing this string changes the generated data. its hashcode is used as the random seed.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/cqlstress-insanity-example.yaml
----------------------------------------------------------------------
diff --git a/tools/cqlstress-insanity-example.yaml b/tools/cqlstress-insanity-example.yaml
new file mode 100644
index 0000000..e94c9c3
--- /dev/null
+++ b/tools/cqlstress-insanity-example.yaml
@@ -0,0 +1,102 @@
+#
+# This is an example YAML profile for cassandra-stress
+#
+# insert data
+# cassandra-stress user profile=/home/jake/stress1.yaml ops(insert=1)
+#
+# read, using query simple1:
+# cassandra-stress profile=/home/jake/stress1.yaml ops(simple1=1)
+#
+# mixed workload (90/10)
+# cassandra-stress user profile=/home/jake/stress1.yaml ops(insert=1,simple1=9)
+
+
+#
+# Keyspace info
+#
+keyspace: stresscql
+
+#
+# The CQL for creating a keyspace (optional if it already exists)
+#
+keyspace_definition: |
+  CREATE KEYSPACE stresscql WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+
+#
+# Table info
+#
+table: insanitytest
+
+#
+# The CQL for creating a table you wish to stress (optional if it already exists)
+#
+table_definition: |
+  CREATE TABLE insanitytest (
+        name text,
+        choice boolean,
+        date timestamp,
+        address inet,
+        dbl double,
+        lval bigint,
+        fval float,
+        ival int,
+        uid timeuuid,
+        dates list<timestamp>,
+        inets set<inet>,
+        value blob,
+        PRIMARY KEY((name, choice), date)
+  ) WITH compaction = { 'class':'LeveledCompactionStrategy' }
+    AND comment='A table of many types to test wide rows and collections'
+
+#
+# Optional meta information on the generated columns in the above table
+# The min and max only apply to text and blob types
+# The distribution field represents the total unique population
+# distribution of that column across rows.  Supported types are
+# 
+#      EXP(min..max)                        An exponential distribution over the range [min..max]
+#      EXTREME(min..max,shape)              An extreme value (Weibull) distribution over the range [min..max]
+#      GAUSSIAN(min..max,stdvrng)           A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
+#      GAUSSIAN(min..max,mean,stdev)        A gaussian/normal distribution, with explicitly defined mean and stdev
+#      UNIFORM(min..max)                    A uniform distribution over the range [min, max]
+#      FIXED(val)                           A fixed distribution, always returning the same value
+#      Aliases: extr, gauss, normal, norm, weibull
+#
+#      If preceded by ~, the distribution is inverted
+#
+# Defaults for all columns are size: uniform(1..256), population: uniform(1..100B)
+#
+columnspec:
+  - name: name
+    clustering: uniform(1..4)
+  - name: date
+    clustering: gaussian(1..20)
+  - name: lval
+    population: fixed(1)
+  - name: dates
+    clustering: uniform(1..100)
+  - name: inets
+    clustering: uniform(1..200)
+  - name: value
+
+insert:
+  partitions: fixed(1)            # number of unique partitions to update in a single operation
+                                  # if perbatch < 1, multiple batches will be used but all partitions will
+                                  # occur in all batches (unless already finished); only the row counts will vary
+  pervisit: uniform(1..10)/100K   # ratio of rows each partition should update in a single visit to the partition,
+                                  # as a proportion of the total possible for the partition
+  perbatch: fixed(1)/1            # number of rows each partition should update in a single batch statement,
+                                  # as a proportion of the proportion we are inserting this visit
+                                  # (i.e. compounds with (and capped by) pervisit)
+  batchtype: UNLOGGED             # type of batch to use
+
+#
+# A list of queries you wish to run against the schema
+#
+queries:
+   simple1: select * from insanitytest where name = ? and choice = ? LIMIT 100
+
+#
+# In order to generate data consistently we need something to generate a unique key for this schema profile.
+#
+seed: changing this string changes the generated data. its hashcode is used as the random seed.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index 87afb3d..7831074 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -18,203 +18,47 @@
 package org.apache.cassandra.stress;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
 import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
 
-import org.apache.cassandra.stress.generatedata.Distribution;
-import org.apache.cassandra.stress.generatedata.KeyGen;
-import org.apache.cassandra.stress.generatedata.RowGen;
+import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.cassandra.stress.generate.Partition;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
 import org.apache.cassandra.stress.settings.*;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
 import org.apache.cassandra.stress.util.Timer;
-import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SliceRange;
 import org.apache.cassandra.transport.SimpleClient;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 public abstract class Operation
 {
-    public final long index;
-    protected final State state;
+    public final StressSettings settings;
+    public final Timer timer;
+    public final PartitionGenerator generator;
+    public final Distribution partitionCount;
 
-    public Operation(State state, long idx)
+    protected List<Partition> partitions;
+
+    public Operation(Timer timer, PartitionGenerator generator, StressSettings settings, Distribution partitionCount)
     {
-        index = idx;
-        this.state = state;
+        this.generator = generator;
+        this.timer = timer;
+        this.settings = settings;
+        this.partitionCount = partitionCount;
     }
 
     public static interface RunOp
     {
         public boolean run() throws Exception;
-        public String key();
-        public int keyCount();
+        public int partitionCount();
+        public int rowCount();
     }
 
-    // one per thread!
-    public static final class State
+    protected void setPartitions(List<Partition> partitions)
     {
-
-        public final StressSettings settings;
-        public final Timer timer;
-        public final Command type;
-        public final KeyGen keyGen;
-        public final RowGen rowGen;
-        public final Distribution counteradd;
-        public final List<ColumnParent> columnParents;
-        public final StressMetrics metrics;
-        public final SettingsCommandMixed.CommandSelector commandSelector;
-        private final EnumMap<Command, State> substates;
-        private Object cqlCache;
-
-        public State(Command type, StressSettings settings, StressMetrics metrics)
-        {
-            this.type = type;
-            this.timer = metrics.getTiming().newTimer();
-            if (type == Command.MIXED)
-            {
-                commandSelector = ((SettingsCommandMixed) settings.command).selector();
-                substates = new EnumMap<>(Command.class);
-            }
-            else
-            {
-                commandSelector = null;
-                substates = null;
-            }
-            counteradd = settings.command.add.get();
-            this.settings = settings;
-            this.keyGen = settings.keys.newKeyGen();
-            this.rowGen = settings.columns.newRowGen();
-            this.metrics = metrics;
-            this.columnParents = columnParents(type, settings);
-        }
-
-        private State(Command type, State copy)
-        {
-            this.type = type;
-            this.timer = copy.timer;
-            this.rowGen = copy.rowGen;
-            this.keyGen = copy.keyGen;
-            this.columnParents = columnParents(type, copy.settings);
-            this.metrics = copy.metrics;
-            this.settings = copy.settings;
-            this.counteradd = copy.counteradd;
-            this.substates = null;
-            this.commandSelector = null;
-        }
-
-        private List<ColumnParent> columnParents(Command type, StressSettings settings)
-        {
-            if (!settings.columns.useSuperColumns)
-                return Collections.singletonList(new ColumnParent(type.table));
-            else
-            {
-                ColumnParent[] cp = new ColumnParent[settings.columns.superColumns];
-                for (int i = 0 ; i < cp.length ; i++)
-                    cp[i] = new ColumnParent(type.supertable).setSuper_column(ByteBufferUtil.bytes("S" + i));
-                return Arrays.asList(cp);
-            }
-        }
-
-
-
-        public boolean isCql3()
-        {
-            return settings.mode.cqlVersion == CqlVersion.CQL3;
-        }
-        public boolean isCql2()
-        {
-            return settings.mode.cqlVersion == CqlVersion.CQL2;
-        }
-        public Object getCqlCache()
-        {
-            return cqlCache;
-        }
-        public void storeCqlCache(Object val)
-        {
-            cqlCache = val;
-        }
-
-        public State substate(Command command)
-        {
-            assert type == Command.MIXED;
-            State substate = substates.get(command);
-            if (substate == null)
-            {
-                substates.put(command, substate = new State(command, this));
-            }
-            return substate;
-        }
-
-    }
-
-    protected ByteBuffer getKey()
-    {
-        return state.keyGen.getKeys(1, index).get(0);
-    }
-
-    protected List<ByteBuffer> getKeys(int count)
-    {
-        return state.keyGen.getKeys(count, index);
-    }
-
-    protected List<ByteBuffer> generateColumnValues(ByteBuffer key)
-    {
-        return state.rowGen.generate(index, key);
-    }
-
-    private int sliceStart(int count)
-    {
-        if (count == state.settings.columns.maxColumnsPerKey)
-            return 0;
-        return 1 + ThreadLocalRandom.current().nextInt(state.settings.columns.maxColumnsPerKey - count);
-    }
-
-    protected SlicePredicate slicePredicate()
-    {
-        final SlicePredicate predicate = new SlicePredicate();
-        if (state.settings.columns.slice)
-        {
-            int count = state.rowGen.count(index);
-            int start = sliceStart(count);
-            predicate.setSlice_range(new SliceRange()
-                                     .setStart(state.settings.columns.names.get(start))
-                                     .setFinish(new byte[] {})
-                                     .setReversed(false)
-                                     .setCount(count)
-            );
-        }
-        else
-            predicate.setColumn_names(randomNames());
-        return predicate;
-    }
-
-    protected List<ByteBuffer> randomNames()
-    {
-        int count = state.rowGen.count(index);
-        List<ByteBuffer> src = state.settings.columns.names;
-        if (count == src.size())
-            return src;
-        ThreadLocalRandom rnd = ThreadLocalRandom.current();
-        List<ByteBuffer> r = new ArrayList<>();
-        int c = 0, o = 0;
-        while (c < count && count + o < src.size())
-        {
-            int leeway = src.size() - (count + o);
-            int spreadover = count - c;
-            o += Math.round(rnd.nextDouble() * (leeway / (double) spreadover));
-            r.add(src.get(o + c++));
-        }
-        while (c < count)
-            r.add(src.get(o + c++));
-        return r;
+        this.partitions = partitions;
     }
 
     /**
@@ -234,13 +78,13 @@ public abstract class Operation
 
     public void timeWithRetry(RunOp run) throws IOException
     {
-        state.timer.start();
+        timer.start();
 
         boolean success = false;
         String exceptionMessage = null;
 
         int tries = 0;
-        for (; tries < state.settings.command.tries; tries++)
+        for (; tries < settings.command.tries; tries++)
         {
             try
             {
@@ -249,7 +93,7 @@ public abstract class Operation
             }
             catch (Exception e)
             {
-                switch (state.settings.log.level)
+                switch (settings.log.level)
                 {
                     case MINIMAL:
                         break;
@@ -269,15 +113,13 @@ public abstract class Operation
             }
         }
 
-        state.timer.stop(run.keyCount());
+        timer.stop(run.partitionCount(), run.rowCount());
 
         if (!success)
         {
-            error(String.format("Operation [%d] x%d key %s (0x%s) %s%n",
-                    index,
+            error(String.format("Operation x%d on key(s) %s: %s%n",
                     tries,
-                    run.key(),
-                    ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(run.key())),
+                    key(),
                     (exceptionMessage == null)
                         ? "Data returned was not validated"
                         : "Error executing: " + exceptionMessage));
@@ -285,6 +127,14 @@ public abstract class Operation
 
     }
 
+    private String key()
+    {
+        List<String> keys = new ArrayList<>();
+        for (Partition partition : partitions)
+            keys.add(partition.getKeyAsString());
+        return keys.toString();
+    }
+
     protected String getExceptionMessage(Exception e)
     {
         String className = e.getClass().getSimpleName();
@@ -294,9 +144,9 @@ public abstract class Operation
 
     protected void error(String message) throws IOException
     {
-        if (!state.settings.command.ignoreErrors)
+        if (!settings.command.ignoreErrors)
             throw new IOException(message);
-        else if (state.settings.log.level.compareTo(SettingsLog.Level.MINIMAL) > 0)
+        else if (settings.log.level.compareTo(SettingsLog.Level.MINIMAL) > 0)
             System.err.println(message);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 07ba1d8..2105a72 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
@@ -29,10 +30,15 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.cassandra.stress.operations.*;
+
+import org.apache.cassandra.stress.generate.Partition;
+import org.apache.cassandra.stress.generate.SeedGenerator;
+import org.apache.cassandra.stress.operations.OpDistribution;
+import org.apache.cassandra.stress.operations.OpDistributionFactory;
 import org.apache.cassandra.stress.settings.*;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.transport.SimpleClient;
 
 public class StressAction implements Runnable
@@ -52,7 +58,8 @@ public class StressAction implements Runnable
         // creating keyspace and column families
         settings.maybeCreateKeyspaces();
 
-        warmup(settings.command.type, settings.command);
+        if (!settings.command.noWarmup)
+            warmup(settings.command.getFactory(settings));
 
         output.println("Sleeping 2s...");
         Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
@@ -61,7 +68,7 @@ public class StressAction implements Runnable
         if (settings.rate.auto)
             success = runAuto();
         else
-            success = null != run(settings.command.type, settings.rate.threadCount, settings.command.count, output);
+            success = null != run(settings.command.getFactory(settings), settings.rate.threadCount, settings.command.count, output);
 
         if (success)
             output.println("END");
@@ -72,33 +79,18 @@ public class StressAction implements Runnable
     }
 
     // type provided separately to support recursive call for mixed command with each command type it is performing
-    private void warmup(Command type, SettingsCommand command)
+    private void warmup(OpDistributionFactory operations)
     {
         // warmup - do 50k iterations; by default hotspot compiles methods after 10k invocations
         PrintStream warmupOutput = new PrintStream(new OutputStream() { @Override public void write(int b) throws IOException { } } );
-        int iterations;
-        switch (type.category)
+        int iterations = 50000 * settings.node.nodes.size();
+        for (OpDistributionFactory single : operations.each())
         {
-            case BASIC:
-                iterations = 50000;
-                break;
-            case MIXED:
-                for (Command subtype : ((SettingsCommandMixed) command).getCommands())
-                    warmup(subtype, command);
-                return;
-            case MULTI:
-                int keysAtOnce = command.keysAtOnce;
-                iterations = Math.min(50000, (int) Math.ceil(500000d / keysAtOnce));
-                break;
-            default:
-                throw new IllegalStateException();
+            // we need to warm up all the nodes in the cluster ideally, but we may not be the only stress instance;
+            // so warm up all the nodes we're speaking to only.
+            output.println(String.format("Warming up %s with %d iterations...", single.desc(), iterations));
+            run(single, 20, iterations, warmupOutput);
         }
-
-        // we need to warm up all the nodes in the cluster ideally, but we may not be the only stress instance;
-        // so warm up all the nodes we're speaking to only.
-        iterations *= settings.node.nodes.size();
-        output.println(String.format("Warming up %s with %d iterations...", type, iterations));
-        run(type, 20, iterations, warmupOutput);
     }
 
     // TODO : permit varying more than just thread count
@@ -113,7 +105,7 @@ public class StressAction implements Runnable
         {
             output.println(String.format("Running with %d threadCount", threadCount));
 
-            StressMetrics result = run(settings.command.type, threadCount, settings.command.count, output);
+            StressMetrics result = run(settings.command.getFactory(settings), threadCount, settings.command.count, output);
             if (result == null)
                 return false;
             results.add(result);
@@ -170,13 +162,13 @@ public class StressAction implements Runnable
         return improvement / count;
     }
 
-    private StressMetrics run(Command type, int threadCount, long opCount, PrintStream output)
+    private StressMetrics run(OpDistributionFactory operations, int threadCount, long opCount, PrintStream output)
     {
 
         output.println(String.format("Running %s with %d threads %s",
-                type.toString(),
-                threadCount,
-                opCount > 0 ? " for " + opCount + " iterations" : "until stderr of mean < " + settings.command.targetUncertainty));
+                                     operations.desc(),
+                                     threadCount,
+                                     opCount > 0 ? " for " + opCount + " iterations" : "until stderr of mean < " + settings.command.targetUncertainty));
         final WorkQueue workQueue;
         if (opCount < 0)
             workQueue = new ContinuousWorkQueue(50);
@@ -193,7 +185,7 @@ public class StressAction implements Runnable
         final CountDownLatch done = new CountDownLatch(threadCount);
         final Consumer[] consumers = new Consumer[threadCount];
         for (int i = 0; i < threadCount; i++)
-            consumers[i] = new Consumer(type, done, workQueue, metrics, rateLimiter);
+            consumers[i] = new Consumer(operations, done, workQueue, metrics, rateLimiter);
 
         // starting worker threadCount
         for (int i = 0; i < threadCount; i++)
@@ -236,18 +228,24 @@ public class StressAction implements Runnable
     private class Consumer extends Thread
     {
 
-        private final Operation.State state;
+        private final OpDistribution operations;
+        private final StressMetrics metrics;
+        private final Timer timer;
+        private final SeedGenerator seedGenerator;
         private final RateLimiter rateLimiter;
         private volatile boolean success = true;
         private final WorkQueue workQueue;
         private final CountDownLatch done;
 
-        public Consumer(Command type, CountDownLatch done, WorkQueue workQueue, StressMetrics metrics, RateLimiter rateLimiter)
+        public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkQueue workQueue, StressMetrics metrics, RateLimiter rateLimiter)
         {
             this.done = done;
             this.rateLimiter = rateLimiter;
             this.workQueue = workQueue;
-            this.state = new Operation.State(type, settings, metrics);
+            this.metrics = metrics;
+            this.timer = metrics.getTiming().newTimer();
+            this.seedGenerator = settings.keys.newSeedGenerator();
+            this.operations = operations.get(timer);
         }
 
         public void run()
@@ -269,63 +267,89 @@ public class StressAction implements Runnable
                         sclient = settings.getSimpleNativeClient();
                         break;
                     case THRIFT:
-                        tclient = settings.getThriftClient();
-                        break;
                     case THRIFT_SMART:
-                        tclient = settings.getSmartThriftClient();
+                        tclient = settings.getThriftClient();
                         break;
                     default:
                         throw new IllegalStateException();
                 }
 
-                Work work;
-                while ( null != (work = workQueue.poll()) )
+                int maxBatchSize = operations.maxBatchSize();
+                Work work = workQueue.poll();
+                Partition[] partitions = new Partition[maxBatchSize];
+                int workDone = 0;
+                while (work != null)
                 {
 
-                    if (rateLimiter != null)
-                        rateLimiter.acquire(work.count);
+                    Operation op = operations.next();
+                    op.generator.reset();
+                    int batchSize = Math.max(1, (int) op.partitionCount.next());
+                    int partitionCount = 0;
 
-                    for (int i = 0 ; i < work.count ; i++)
+                    while (partitionCount < batchSize)
                     {
-                        try
+                        int count = Math.min((work.count - workDone), batchSize - partitionCount);
+                        for (int i = 0 ; i < count ; i++)
                         {
-                            Operation op = createOperation(state, i + work.offset);
-                            switch (settings.mode.api)
-                            {
-                                case JAVA_DRIVER_NATIVE:
-                                    op.run(jclient);
-                                    break;
-                                case SIMPLE_NATIVE:
-                                    op.run(sclient);
-                                    break;
-                                case THRIFT:
-                                case THRIFT_SMART:
-                                default:
-                                    op.run(tclient);
-                            }
-                        } catch (Exception e)
+                            long seed = seedGenerator.next(work.offset + workDone + i);
+                            partitions[partitionCount + i] = op.generator.generate(seed);
+                        }
+                        workDone += count;
+                        partitionCount += count;
+                        if (workDone == work.count)
                         {
-                            if (output == null)
+                            workDone = 0;
+                            work = workQueue.poll();
+                            if (work == null)
                             {
-                                System.err.println(e.getMessage());
-                                success = false;
-                                System.exit(-1);
+                                if (partitionCount == 0)
+                                    return;
+                                break;
                             }
+                            if (rateLimiter != null)
+                                rateLimiter.acquire(work.count);
+                        }
+                    }
+
+                    op.setPartitions(Arrays.asList(partitions).subList(0, partitionCount));
 
-                            e.printStackTrace(output);
+                    try
+                    {
+                        switch (settings.mode.api)
+                        {
+                            case JAVA_DRIVER_NATIVE:
+                                op.run(jclient);
+                                break;
+                            case SIMPLE_NATIVE:
+                                op.run(sclient);
+                                break;
+                            case THRIFT:
+                            case THRIFT_SMART:
+                            default:
+                                op.run(tclient);
+                        }
+                    }
+                    catch (Exception e)
+                    {
+                        if (output == null)
+                        {
+                            System.err.println(e.getMessage());
                             success = false;
-                            workQueue.stop();
-                            state.metrics.cancel();
-                            return;
+                            System.exit(-1);
                         }
+
+                        e.printStackTrace(output);
+                        success = false;
+                        workQueue.stop();
+                        metrics.cancel();
+                        return;
                     }
                 }
-
             }
             finally
             {
                 done.countDown();
-                state.timer.close();
+                timer.close();
             }
 
         }
@@ -443,106 +467,4 @@ public class StressAction implements Runnable
 
     }
 
-    private Operation createOperation(Operation.State state, long index)
-    {
-        return createOperation(state.type, state, index);
-    }
-    private Operation createOperation(Command type, Operation.State state, long index)
-    {
-        switch (type)
-        {
-            case READ:
-                switch(state.settings.mode.style)
-                {
-                    case THRIFT:
-                        return new ThriftReader(state, index);
-                    case CQL:
-                    case CQL_PREPARED:
-                        return new CqlReader(state, index);
-                    default:
-                        throw new UnsupportedOperationException();
-                }
-
-
-            case COUNTER_READ:
-                switch(state.settings.mode.style)
-                {
-                    case THRIFT:
-                        return new ThriftCounterGetter(state, index);
-                    case CQL:
-                    case CQL_PREPARED:
-                        return new CqlCounterGetter(state, index);
-                    default:
-                        throw new UnsupportedOperationException();
-                }
-
-            case WRITE:
-                switch(state.settings.mode.style)
-                {
-                    case THRIFT:
-                        return new ThriftInserter(state, index);
-                    case CQL:
-                    case CQL_PREPARED:
-                        return new CqlInserter(state, index);
-                    default:
-                        throw new UnsupportedOperationException();
-                }
-
-            case COUNTER_WRITE:
-                switch(state.settings.mode.style)
-                {
-                    case THRIFT:
-                        return new ThriftCounterAdder(state, index);
-                    case CQL:
-                    case CQL_PREPARED:
-                        return new CqlCounterAdder(state, index);
-                    default:
-                        throw new UnsupportedOperationException();
-                }
-
-            case RANGE_SLICE:
-                switch(state.settings.mode.style)
-                {
-                    case THRIFT:
-                        return new ThriftRangeSlicer(state, index);
-                    case CQL:
-                    case CQL_PREPARED:
-                        return new CqlRangeSlicer(state, index);
-                    default:
-                        throw new UnsupportedOperationException();
-                }
-
-            case INDEXED_RANGE_SLICE:
-                switch(state.settings.mode.style)
-                {
-                    case THRIFT:
-                        return new ThriftIndexedRangeSlicer(state, index);
-                    case CQL:
-                    case CQL_PREPARED:
-                        return new CqlIndexedRangeSlicer(state, index);
-                    default:
-                        throw new UnsupportedOperationException();
-                }
-
-            case READ_MULTI:
-                switch(state.settings.mode.style)
-                {
-                    case THRIFT:
-                        return new ThriftMultiGetter(state, index);
-                    case CQL:
-                    case CQL_PREPARED:
-                        return new CqlMultiGetter(state, index);
-                    default:
-                        throw new UnsupportedOperationException();
-                }
-
-            case MIXED:
-                Command subcommand = state.commandSelector.next();
-                return createOperation(subcommand, state.substate(subcommand), index);
-
-        }
-
-        throw new UnsupportedOperationException();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
index 54a1e2c..7e5c1b6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@ -127,20 +127,21 @@ public class StressMetrics
 
     // PRINT FORMATTING
 
-    public static final String HEADFORMAT = "%-10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s";
-    public static final String ROWFORMAT =  "%-10d,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f";
+    public static final String HEADFORMAT = "%-10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s";
+    public static final String ROWFORMAT =  "%-10d,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f";
 
     private static void printHeader(String prefix, PrintStream output)
     {
-        output.println(prefix + String.format(HEADFORMAT, "ops","op/s", "key/s","mean","med",".95",".99",".999","max","time","stderr"));
+        output.println(prefix + String.format(HEADFORMAT, "partitions","op/s", "pk/s", "row/s","mean","med",".95",".99",".999","max","time","stderr"));
     }
 
     private static void printRow(String prefix, TimingInterval interval, TimingInterval total, Uncertainty opRateUncertainty, PrintStream output)
     {
         output.println(prefix + String.format(ROWFORMAT,
-                total.operationCount,
+                total.partitionCount,
                 interval.realOpRate(),
-                interval.keyRate(),
+                interval.partitionRate(),
+                interval.rowRate(),
                 interval.meanLatency(),
                 interval.medianLatency(),
                 interval.rankLatency(0.95f),
@@ -156,9 +157,9 @@ public class StressMetrics
         output.println("\n");
         output.println("Results:");
         TimingInterval history = timing.getHistory();
-        output.println(String.format("real op rate              : %.0f", history.realOpRate()));
-        output.println(String.format("adjusted op rate stderr   : %.0f", opRateUncertainty.getUncertainty()));
-        output.println(String.format("key rate                  : %.0f", history.keyRate()));
+        output.println(String.format("op rate                   : %.0f", history.realOpRate()));
+        output.println(String.format("partition rate            : %.0f", history.partitionRate()));
+        output.println(String.format("row rate                  : %.0f", history.rowRate()));
         output.println(String.format("latency mean              : %.1f", history.meanLatency()));
         output.println(String.format("latency median            : %.1f", history.medianLatency()));
         output.println(String.format("latency 95th percentile   : %.1f", history.rankLatency(.95f)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
new file mode 100644
index 0000000..13f26a2
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -0,0 +1,504 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.stress;
+
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.CreateKeyspaceStatement;
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+import org.apache.cassandra.stress.generate.DistributionFactory;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.RatioDistributionFactory;
+import org.apache.cassandra.stress.generate.values.Booleans;
+import org.apache.cassandra.stress.generate.values.Bytes;
+import org.apache.cassandra.stress.generate.values.Generator;
+import org.apache.cassandra.stress.generate.values.Dates;
+import org.apache.cassandra.stress.generate.values.Doubles;
+import org.apache.cassandra.stress.generate.values.Floats;
+import org.apache.cassandra.stress.generate.values.GeneratorConfig;
+import org.apache.cassandra.stress.generate.values.Inets;
+import org.apache.cassandra.stress.generate.values.Integers;
+import org.apache.cassandra.stress.generate.values.Lists;
+import org.apache.cassandra.stress.generate.values.Longs;
+import org.apache.cassandra.stress.generate.values.Sets;
+import org.apache.cassandra.stress.generate.values.Strings;
+import org.apache.cassandra.stress.generate.values.TimeUUIDs;
+import org.apache.cassandra.stress.generate.values.UUIDs;
+import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
+import org.apache.cassandra.stress.operations.userdefined.SchemaQuery;
+import org.apache.cassandra.stress.settings.OptionDistribution;
+import org.apache.cassandra.stress.settings.OptionRatioDistribution;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.settings.ValidationType;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.ThriftConversion;
+import org.apache.thrift.TException;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
+import org.yaml.snakeyaml.error.YAMLException;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StressProfile implements Serializable
+{
+    private String keyspaceCql;
+    private String tableCql;
+    private String seedStr;
+
+    public String keyspaceName;
+    public String tableName;
+    private Map<String, GeneratorConfig> columnConfigs;
+    private Map<String, String> queries;
+    private Map<String, String> insert;
+
+    transient volatile TableMetadata tableMetaData;
+
+    transient volatile GeneratorFactory generatorFactory;
+
+    transient volatile BatchStatement.Type batchType;
+    transient volatile DistributionFactory partitions;
+    transient volatile RatioDistributionFactory pervisit;
+    transient volatile RatioDistributionFactory perbatch;
+    transient volatile PreparedStatement insertStatement;
+    transient volatile Integer thriftInsertId;
+
+    transient volatile Map<String, PreparedStatement> queryStatements;
+    transient volatile Map<String, Integer> thriftQueryIds;
+
+    private void init(StressYaml yaml) throws RequestValidationException
+    {
+        keyspaceName = yaml.keyspace;
+        keyspaceCql = yaml.keyspace_definition;
+        tableName = yaml.table;
+        tableCql = yaml.table_definition;
+        seedStr = yaml.seed;
+        queries = yaml.queries;
+        insert = yaml.insert;
+
+        assert keyspaceName != null : "keyspace name is required in yaml file";
+        assert tableName != null : "table name is required in yaml file";
+        assert queries != null : "queries map is required in yaml file";
+
+        if (keyspaceCql != null && keyspaceCql.length() > 0)
+        {
+            String name = ((CreateKeyspaceStatement) QueryProcessor.parseStatement(keyspaceCql)).keyspace();
+            assert name.equalsIgnoreCase(keyspaceName) : "Name in keyspace_definition doesn't match keyspace property: '" + name + "' != '" + keyspaceName + "'";
+        }
+        else
+        {
+            keyspaceCql = null;
+        }
+
+        if (tableCql != null && tableCql.length() > 0)
+        {
+            String name = CFMetaData.compile(tableCql, keyspaceName).cfName;
+            assert name.equalsIgnoreCase(tableName) : "Name in table_definition doesn't match table property: '" + name + "' != '" + tableName + "'";
+        }
+        else
+        {
+            tableCql = null;
+        }
+
+        columnConfigs = new HashMap<>();
+        for (Map<String,Object> spec : yaml.columnspec)
+        {
+            lowerCase(spec);
+            String name = (String) spec.remove("name");
+            DistributionFactory population = !spec.containsKey("population") ? null : OptionDistribution.get((String) spec.remove("population"));
+            DistributionFactory size = !spec.containsKey("size") ? null : OptionDistribution.get((String) spec.remove("size"));
+            DistributionFactory clustering = !spec.containsKey("cluster") ? null : OptionDistribution.get((String) spec.remove("cluster"));
+
+            if (!spec.isEmpty())
+                throw new IllegalArgumentException("Unrecognised option(s) in column spec: " + spec);
+            if (name == null)
+                throw new IllegalArgumentException("Missing name argument in column spec");
+
+            GeneratorConfig config = new GeneratorConfig(yaml.seed + name, clustering, size, population);
+            columnConfigs.put(name, config);
+        }
+    }
+
+    public void maybeCreateSchema(StressSettings settings)
+    {
+        JavaDriverClient client = settings.getJavaDriverClient(false);
+
+        if (keyspaceCql != null)
+        {
+            try
+            {
+                client.execute(keyspaceCql, org.apache.cassandra.db.ConsistencyLevel.ONE);
+            }
+            catch (AlreadyExistsException e)
+            {
+            }
+        }
+
+        client.execute("use "+keyspaceName, org.apache.cassandra.db.ConsistencyLevel.ONE);
+
+        if (tableCql != null)
+        {
+            try
+            {
+                client.execute(tableCql, org.apache.cassandra.db.ConsistencyLevel.ONE);
+            }
+            catch (AlreadyExistsException e)
+            {
+            }
+
+            System.out.println(String.format("Created schema. Sleeping %ss for propagation.", settings.node.nodes.size()));
+            Uninterruptibles.sleepUninterruptibly(settings.node.nodes.size(), TimeUnit.SECONDS);
+        }
+
+        maybeLoadSchemaInfo(settings);
+    }
+
+
+    private void maybeLoadSchemaInfo(StressSettings settings)
+    {
+        if (tableMetaData == null)
+        {
+            JavaDriverClient client = settings.getJavaDriverClient();
+
+            synchronized (client)
+            {
+
+                if (tableMetaData != null)
+                    return;
+
+                TableMetadata metadata = client.getCluster()
+                                               .getMetadata()
+                                               .getKeyspace(keyspaceName)
+                                               .getTable(tableName);
+
+                //Fill in missing column configs
+                for (ColumnMetadata col : metadata.getColumns())
+                {
+                    if (columnConfigs.containsKey(col.getName()))
+                        continue;
+
+                    columnConfigs.put(col.getName(), new GeneratorConfig(seedStr + col.getName(), null, null, null));
+                }
+
+                tableMetaData = metadata;
+            }
+        }
+    }
+
+    public SchemaQuery getQuery(String name, Timer timer, PartitionGenerator generator, StressSettings settings)
+    {
+        if (queryStatements == null)
+        {
+            synchronized (this)
+            {
+                if (queryStatements == null)
+                {
+                    try
+                    {
+                        JavaDriverClient jclient = settings.getJavaDriverClient();
+                        ThriftClient tclient = settings.getThriftClient();
+                        Map<String, PreparedStatement> stmts = new HashMap<>();
+                        Map<String, Integer> tids = new HashMap<>();
+                        for (Map.Entry<String, String> e : queries.entrySet())
+                        {
+                            stmts.put(e.getKey().toLowerCase(), jclient.prepare(e.getValue()));
+                            tids.put(e.getKey().toLowerCase(), tclient.prepare_cql3_query(e.getValue(), Compression.NONE));
+                        }
+                        thriftQueryIds = tids;
+                        queryStatements = stmts;
+                    }
+                    catch (TException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        }
+
+        // TODO validation
+        name = name.toLowerCase();
+        return new SchemaQuery(timer, generator, settings, thriftQueryIds.get(name), queryStatements.get(name), ThriftConversion.fromThrift(settings.command.consistencyLevel), ValidationType.NOT_FAIL);
+    }
+
+    public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, StressSettings settings)
+    {
+        if (insertStatement == null)
+        {
+            synchronized (this)
+            {
+                if (insertStatement == null)
+                {
+                    maybeLoadSchemaInfo(settings);
+
+                    Set<ColumnMetadata> keyColumns = com.google.common.collect.Sets.newHashSet(tableMetaData.getPrimaryKey());
+
+                    //Non PK Columns
+                    StringBuilder sb = new StringBuilder();
+
+                    sb.append("UPDATE \"").append(tableName).append("\" SET ");
+
+                    //PK Columns
+                    StringBuilder pred = new StringBuilder();
+                    pred.append(" WHERE ");
+
+                    boolean firstCol = true;
+                    boolean firstPred = true;
+                    for (ColumnMetadata c : tableMetaData.getColumns())
+                    {
+
+                        if (keyColumns.contains(c))
+                        {
+                            if (firstPred)
+                                firstPred = false;
+                            else
+                                pred.append(" AND ");
+
+                            pred.append(c.getName()).append(" = ?");
+                        }
+                        else
+                        {
+                            if (firstCol)
+                                firstCol = false;
+                            else
+                                sb.append(",");
+
+                            sb.append(c.getName()).append(" = ");
+
+                            switch (c.getType().getName())
+                            {
+                                case SET:
+                                case LIST:
+                                case COUNTER:
+                                    sb.append(c.getName()).append(" + ?");
+                                    break;
+                                default:
+                                    sb.append("?");
+                                    break;
+                            }
+                        }
+                    }
+
+                    //Put PK predicates at the end
+                    sb.append(pred);
+
+                    if (insert == null)
+                        insert = new HashMap<>();
+                    lowerCase(insert);
+
+                    partitions = OptionDistribution.get(!insert.containsKey("partitions") ? "fixed(1)" : insert.remove("partitions"));
+                    pervisit = OptionRatioDistribution.get(!insert.containsKey("pervisit") ? "fixed(1)/1" : insert.remove("pervisit"));
+                    perbatch = OptionRatioDistribution.get(!insert.containsKey("perbatch") ? "fixed(1)/1" : insert.remove("perbatch"));
+                    batchType = !insert.containsKey("batchtype") ? BatchStatement.Type.UNLOGGED : BatchStatement.Type.valueOf(insert.remove("batchtype"));
+                    if (!insert.isEmpty())
+                        throw new IllegalArgumentException("Unrecognised insert option(s): " + insert);
+
+                    if (generator.maxRowCount > 100 * 1000 * 1000)
+                        System.err.printf("WARNING: You have defined a schema that permits very large partitions (%.0f max rows (>100M))\n", generator.maxRowCount);
+                    if (perbatch.get().max() * pervisit.get().max() * partitions.get().maxValue() * generator.maxRowCount > 100000)
+                        System.err.printf("WARNING: You have defined a schema that permits very large batches (%.0f max rows (>100K)). This may OOM this stress client, or the server.\n",
+                                           perbatch.get().max() * pervisit.get().max() * partitions.get().maxValue() * generator.maxRowCount);
+
+                    JavaDriverClient client = settings.getJavaDriverClient();
+                    String query = sb.toString();
+                    try
+                    {
+                        thriftInsertId = settings.getThriftClient().prepare_cql3_query(query, Compression.NONE);
+                    }
+                    catch (TException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                    insertStatement = client.prepare(query);
+                }
+            }
+        }
+
+        return new SchemaInsert(timer, generator, settings, partitions.get(), pervisit.get(), perbatch.get(), thriftInsertId, insertStatement, ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType);
+    }
+
+    public PartitionGenerator newGenerator(StressSettings settings)
+    {
+        if (generatorFactory == null)
+        {
+            synchronized (this)
+            {
+                maybeLoadSchemaInfo(settings);
+                if (generatorFactory == null)
+                    generatorFactory = new GeneratorFactory();
+            }
+        }
+
+        return generatorFactory.newGenerator();
+    }
+
+    private class GeneratorFactory
+    {
+        final List<ColumnInfo> partitionKeys = new ArrayList<>();
+        final List<ColumnInfo> clusteringColumns = new ArrayList<>();
+        final List<ColumnInfo> valueColumns = new ArrayList<>();
+
+        private GeneratorFactory()
+        {
+            Set<ColumnMetadata> keyColumns = com.google.common.collect.Sets.newHashSet(tableMetaData.getPrimaryKey());
+
+            for (ColumnMetadata metadata : tableMetaData.getPartitionKey())
+                partitionKeys.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName())));
+            for (ColumnMetadata metadata : tableMetaData.getClusteringColumns())
+                clusteringColumns.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName())));
+            for (ColumnMetadata metadata : tableMetaData.getColumns())
+                if (!keyColumns.contains(metadata))
+                    valueColumns.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName())));
+        }
+
+        PartitionGenerator newGenerator()
+        {
+            return new PartitionGenerator(get(partitionKeys), get(clusteringColumns), get(valueColumns));
+        }
+
+        List<Generator> get(List<ColumnInfo> columnInfos)
+        {
+            List<Generator> result = new ArrayList<>();
+            for (ColumnInfo columnInfo : columnInfos)
+                result.add(columnInfo.getGenerator());
+            return result;
+        }
+    }
+
+    static class ColumnInfo
+    {
+        final String name;
+        final DataType type;
+        final GeneratorConfig config;
+
+        ColumnInfo(String name, DataType type, GeneratorConfig config)
+        {
+            this.name = name;
+            this.type = type;
+            this.config = config;
+        }
+
+        Generator getGenerator()
+        {
+            return getGenerator(name, type, config);
+        }
+
+        static Generator getGenerator(final String name, final DataType type, GeneratorConfig config)
+        {
+            switch (type.getName())
+            {
+                case ASCII:
+                case TEXT:
+                case VARCHAR:
+                    return new Strings(name, config);
+                case BIGINT:
+                case COUNTER:
+                    return new Longs(name, config);
+                case BLOB:
+                    return new Bytes(name, config);
+                case BOOLEAN:
+                    return new Booleans(name, config);
+                case DECIMAL:
+                case DOUBLE:
+                    return new Doubles(name, config);
+                case FLOAT:
+                    return new Floats(name, config);
+                case INET:
+                    return new Inets(name, config);
+                case INT:
+                case VARINT:
+                    return new Integers(name, config);
+                case TIMESTAMP:
+                    return new Dates(name, config);
+                case UUID:
+                    return new UUIDs(name, config);
+                case TIMEUUID:
+                    return new TimeUUIDs(name, config);
+                case SET:
+                    return new Sets(name, getGenerator(name, type.getTypeArguments().get(0), config), config);
+                case LIST:
+                    return new Lists(name, getGenerator(name, type.getTypeArguments().get(0), config), config);
+                default:
+                    throw new UnsupportedOperationException();
+            }
+        }
+    }
+
+    public static StressProfile load(File file) throws IOError
+    {
+        try
+        {
+            byte[] profileBytes = Files.readAllBytes(Paths.get(file.toURI()));
+
+            Constructor constructor = new Constructor(StressYaml.class);
+
+            Yaml yaml = new Yaml(constructor);
+
+            StressYaml profileYaml = yaml.loadAs(new ByteArrayInputStream(profileBytes), StressYaml.class);
+
+            StressProfile profile = new StressProfile();
+            profile.init(profileYaml);
+
+            return profile;
+        }
+        catch (YAMLException | IOException | RequestValidationException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    static <V> void lowerCase(Map<String, V> map)
+    {
+        List<Map.Entry<String, V>> reinsert = new ArrayList<>();
+        Iterator<Map.Entry<String, V>> iter = map.entrySet().iterator();
+        while (iter.hasNext())
+        {
+            Map.Entry<String, V> e = iter.next();
+            if (!e.getKey().toLowerCase().equalsIgnoreCase(e.getKey()))
+            {
+                reinsert.add(e);
+                iter.remove();
+            }
+        }
+        for (Map.Entry<String, V> e : reinsert)
+            map.put(e.getKey().toLowerCase(), e.getValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
new file mode 100644
index 0000000..e94fa77
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.stress;
+
+import java.util.List;
+import java.util.Map;
+
+public class StressYaml
+{
+    public String seed;
+    public String keyspace;
+    public String keyspace_definition;
+    public String table;
+    public String table_definition;
+
+    public List<Map<String,Object>> columnspec;
+    public Map<String,String> queries;
+    public Map<String,String> insert;
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/generate/Distribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Distribution.java b/tools/stress/src/org/apache/cassandra/stress/generate/Distribution.java
new file mode 100644
index 0000000..4662454
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/Distribution.java
@@ -0,0 +1,57 @@
+package org.apache.cassandra.stress.generate;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.Serializable;
+
+public abstract class Distribution implements Serializable
+{
+
+    public abstract long next();
+    public abstract double nextDouble();
+    public abstract long inverseCumProb(double cumProb);
+    public abstract void setSeed(long seed);
+
+    public long maxValue()
+    {
+        return inverseCumProb(1d);
+    }
+
+    public long minValue()
+    {
+        return inverseCumProb(0d);
+    }
+
+    // approximation of the average; slightly costly to calculate, so should not be invoked frequently
+    public long average()
+    {
+        double sum = 0;
+        int count = 0;
+        for (float d = 0 ; d <= 1.0d ; d += 0.02d)
+        {
+            sum += inverseCumProb(d);
+            count += 1;
+        }
+        return (long) (sum / count);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/generate/DistributionBoundApache.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionBoundApache.java b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionBoundApache.java
new file mode 100644
index 0000000..23ce3e9
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionBoundApache.java
@@ -0,0 +1,84 @@
+package org.apache.cassandra.stress.generate;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import org.apache.commons.math3.distribution.AbstractRealDistribution;
+
+public class DistributionBoundApache extends Distribution
+{
+
+    final AbstractRealDistribution delegate;
+    final long min, max;
+
+    public DistributionBoundApache(AbstractRealDistribution delegate, long min, long max)
+    {
+        this.delegate = delegate;
+        this.min = min;
+        this.max = max;
+    }
+
+    @Override
+    public long next()
+    {
+        return bound(min, max, delegate.sample());
+    }
+
+    public double nextDouble()
+    {
+        return boundDouble(min, max, delegate.sample());
+    }
+
+    @Override
+    public long inverseCumProb(double cumProb)
+    {
+        return bound(min, max, delegate.inverseCumulativeProbability(cumProb));
+    }
+
+    public void setSeed(long seed)
+    {
+        delegate.reseedRandomGenerator(seed);
+    }
+
+    private static long bound(long min, long max, double val)
+    {
+        long r = (long) val;
+        if ((r >= min) & (r <= max))
+            return r;
+        if (r < min)
+            return min;
+        if (r > max)
+            return max;
+        throw new IllegalStateException();
+    }
+
+    private static double boundDouble(long min, long max, double r)
+    {
+        if ((r >= min) & (r <= max))
+            return r;
+        if (r < min)
+            return min;
+        if (r > max)
+            return max;
+        throw new IllegalStateException();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFactory.java
new file mode 100644
index 0000000..d0dfa89
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFactory.java
@@ -0,0 +1,31 @@
+package org.apache.cassandra.stress.generate;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.Serializable;
+
+public interface DistributionFactory extends Serializable
+{
+
+    Distribution get();
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFixed.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFixed.java b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFixed.java
new file mode 100644
index 0000000..bbfb894
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFixed.java
@@ -0,0 +1,54 @@
+package org.apache.cassandra.stress.generate;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+public class DistributionFixed extends Distribution
+{
+
+    final long key;
+
+    public DistributionFixed(long key)
+    {
+        this.key = key;
+    }
+
+    @Override
+    public long next()
+    {
+        return key;
+    }
+
+    public double nextDouble()
+    {
+        return key;
+    }
+
+    @Override
+    public long inverseCumProb(double cumProb)
+    {
+        return key;
+    }
+
+    public void setSeed(long seed)
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java
new file mode 100644
index 0000000..df52cb8
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java
@@ -0,0 +1,37 @@
+package org.apache.cassandra.stress.generate;
+
+public class DistributionInverted extends Distribution
+{
+
+    final Distribution wrapped;
+    final long min;
+    final long max;
+
+    public DistributionInverted(Distribution wrapped)
+    {
+        this.wrapped = wrapped;
+        this.min = wrapped.minValue();
+        this.max = wrapped.maxValue();
+    }
+
+    public long next()
+    {
+        return max - (wrapped.next() - min);
+    }
+
+    public double nextDouble()
+    {
+        return max - (wrapped.nextDouble() - min);
+    }
+
+    public long inverseCumProb(double cumProb)
+    {
+        return max - (wrapped.inverseCumProb(cumProb) - min);
+    }
+
+    public void setSeed(long seed)
+    {
+        wrapped.setSeed(seed);
+    }
+
+}