You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/07/07 19:34:43 UTC
[20/23] git commit: Introduce CQL support for stress tool
Introduce CQL support for stress tool
patch by Jake Luciani and Benedict Elliott Smith
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/75364296
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/75364296
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/75364296
Branch: refs/heads/cassandra-2.1.0
Commit: 75364296cb3749dfa6d4307cf055f1a18bcf7a9b
Parents: 61ee287
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Jul 7 18:03:59 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Jul 7 18:25:11 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/thrift/ThriftConversion.java | 19 +
.../org/apache/cassandra/utils/UUIDGen.java | 8 +
tools/bin/cassandra-stress | 2 +-
tools/cqlstress-counter-example.yaml | 85 +++
tools/cqlstress-example.yaml | 99 +++
tools/cqlstress-insanity-example.yaml | 102 +++
.../org/apache/cassandra/stress/Operation.java | 218 +-----
.../apache/cassandra/stress/StressAction.java | 262 +++----
.../apache/cassandra/stress/StressMetrics.java | 17 +-
.../apache/cassandra/stress/StressProfile.java | 504 +++++++++++++
.../org/apache/cassandra/stress/StressYaml.java | 38 +
.../cassandra/stress/generate/Distribution.java | 57 ++
.../generate/DistributionBoundApache.java | 84 +++
.../stress/generate/DistributionFactory.java | 31 +
.../stress/generate/DistributionFixed.java | 54 ++
.../stress/generate/DistributionInverted.java | 37 +
.../generate/DistributionOffsetApache.java | 80 +++
.../cassandra/stress/generate/Partition.java | 343 +++++++++
.../stress/generate/PartitionGenerator.java | 80 +++
.../stress/generate/RatioDistribution.java | 25 +
.../generate/RatioDistributionFactory.java | 31 +
.../apache/cassandra/stress/generate/Row.java | 22 +
.../stress/generate/SeedGenerator.java | 8 +
.../stress/generate/SeedRandomGenerator.java | 33 +
.../stress/generate/SeedSeriesGenerator.java | 21 +
.../stress/generate/values/Booleans.java | 37 +
.../cassandra/stress/generate/values/Bytes.java | 54 ++
.../cassandra/stress/generate/values/Dates.java | 46 ++
.../stress/generate/values/Doubles.java | 37 +
.../stress/generate/values/Floats.java | 37 +
.../stress/generate/values/Generator.java | 50 ++
.../stress/generate/values/GeneratorConfig.java | 68 ++
.../stress/generate/values/HexBytes.java | 56 ++
.../stress/generate/values/HexStrings.java | 55 ++
.../cassandra/stress/generate/values/Inets.java | 57 ++
.../stress/generate/values/Integers.java | 38 +
.../cassandra/stress/generate/values/Lists.java | 55 ++
.../cassandra/stress/generate/values/Longs.java | 37 +
.../cassandra/stress/generate/values/Sets.java | 54 ++
.../stress/generate/values/Strings.java | 49 ++
.../stress/generate/values/TimeUUIDs.java | 51 ++
.../cassandra/stress/generate/values/UUIDs.java | 39 +
.../cassandra/stress/generatedata/DataGen.java | 39 -
.../stress/generatedata/DataGenBytesRandom.java | 45 --
.../stress/generatedata/DataGenFactory.java | 30 -
.../stress/generatedata/DataGenHex.java | 60 --
.../DataGenHexFromDistribution.java | 66 --
.../generatedata/DataGenHexFromOpIndex.java | 48 --
.../generatedata/DataGenStringDictionary.java | 107 ---
.../generatedata/DataGenStringRepeats.java | 90 ---
.../stress/generatedata/Distribution.java | 40 --
.../generatedata/DistributionBoundApache.java | 63 --
.../generatedata/DistributionFactory.java | 31 -
.../stress/generatedata/DistributionFixed.java | 46 --
.../generatedata/DistributionOffsetApache.java | 61 --
.../generatedata/DistributionSeqBatch.java | 68 --
.../cassandra/stress/generatedata/KeyGen.java | 54 --
.../cassandra/stress/generatedata/RowGen.java | 53 --
.../generatedata/RowGenDistributedSize.java | 116 ---
.../stress/operations/CqlCounterAdder.java | 77 --
.../stress/operations/CqlCounterGetter.java | 69 --
.../operations/CqlIndexedRangeSlicer.java | 118 ---
.../stress/operations/CqlInserter.java | 88 ---
.../stress/operations/CqlMultiGetter.java | 42 --
.../stress/operations/CqlOperation.java | 698 ------------------
.../stress/operations/CqlRangeSlicer.java | 59 --
.../cassandra/stress/operations/CqlReader.java | 91 ---
.../stress/operations/FixedOpDistribution.java | 25 +
.../stress/operations/OpDistribution.java | 11 +
.../operations/OpDistributionFactory.java | 12 +
.../operations/SampledOpDistribution.java | 41 ++
.../SampledOpDistributionFactory.java | 72 ++
.../stress/operations/ThriftCounterAdder.java | 95 ---
.../stress/operations/ThriftCounterGetter.java | 68 --
.../operations/ThriftIndexedRangeSlicer.java | 114 ---
.../stress/operations/ThriftInserter.java | 117 ---
.../stress/operations/ThriftMultiGetter.java | 80 ---
.../stress/operations/ThriftRangeSlicer.java | 85 ---
.../stress/operations/ThriftReader.java | 94 ---
.../operations/predefined/CqlCounterAdder.java | 84 +++
.../operations/predefined/CqlCounterGetter.java | 74 ++
.../operations/predefined/CqlInserter.java | 79 ++
.../operations/predefined/CqlOperation.java | 714 +++++++++++++++++++
.../stress/operations/predefined/CqlReader.java | 87 +++
.../predefined/PredefinedOperation.java | 248 +++++++
.../predefined/ThriftCounterAdder.java | 86 +++
.../predefined/ThriftCounterGetter.java | 66 ++
.../operations/predefined/ThriftInserter.java | 96 +++
.../operations/predefined/ThriftReader.java | 79 ++
.../operations/userdefined/SchemaInsert.java | 144 ++++
.../operations/userdefined/SchemaQuery.java | 86 +++
.../operations/userdefined/SchemaStatement.java | 164 +++++
.../cassandra/stress/settings/Command.java | 24 +-
.../stress/settings/CommandCategory.java | 4 +-
.../stress/settings/OptionAnyProbabilities.java | 78 ++
.../stress/settings/OptionDataGen.java | 203 ------
.../stress/settings/OptionDistribution.java | 75 +-
.../settings/OptionEnumProbabilities.java | 62 ++
.../cassandra/stress/settings/OptionMulti.java | 3 +-
.../settings/OptionRatioDistribution.java | 166 +++++
.../cassandra/stress/settings/OptionSimple.java | 5 +-
.../stress/settings/SettingsColumn.java | 50 +-
.../stress/settings/SettingsCommand.java | 58 +-
.../stress/settings/SettingsCommandMixed.java | 207 ------
.../settings/SettingsCommandPreDefined.java | 145 ++++
.../SettingsCommandPreDefinedMixed.java | 151 ++++
.../stress/settings/SettingsCommandUser.java | 135 ++++
.../cassandra/stress/settings/SettingsKey.java | 27 +-
.../cassandra/stress/settings/SettingsMisc.java | 2 +-
.../cassandra/stress/settings/SettingsMode.java | 6 +-
.../stress/settings/SettingsSchema.java | 77 +-
.../stress/settings/SettingsTransport.java | 2 +-
.../stress/settings/StressSettings.java | 36 +-
.../stress/settings/ValidationType.java | 8 +
.../cassandra/stress/util/JavaDriverClient.java | 19 +-
.../stress/util/SmartThriftClient.java | 4 +-
.../org/apache/cassandra/stress/util/Timer.java | 17 +-
.../cassandra/stress/util/TimingInterval.java | 26 +-
119 files changed, 5861 insertions(+), 3890 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 491ad6d..ff2f586 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.0-rc3
+ * Introduce CQL support for stress tool (CASSANDRA-6146)
* Fix ClassCastException processing expired messages (CASSANDRA-7496)
* Fix prepared marker for collections inside UDT (CASSANDRA-7472)
* Remove left-over populate_io_cache_on_flush and replicate_on_write
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 9573b56..0c75d2c 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -51,6 +51,25 @@ public class ThriftConversion
throw new AssertionError();
}
+ public static ConsistencyLevel toThrift(org.apache.cassandra.db.ConsistencyLevel cl)
+ {
+ switch (cl)
+ {
+ case ANY: return ConsistencyLevel.ANY;
+ case ONE: return ConsistencyLevel.ONE;
+ case TWO: return ConsistencyLevel.TWO;
+ case THREE: return ConsistencyLevel.THREE;
+ case QUORUM: return ConsistencyLevel.QUORUM;
+ case ALL: return ConsistencyLevel.ALL;
+ case LOCAL_QUORUM: return ConsistencyLevel.LOCAL_QUORUM;
+ case EACH_QUORUM: return ConsistencyLevel.EACH_QUORUM;
+ case SERIAL: return ConsistencyLevel.SERIAL;
+ case LOCAL_SERIAL: return ConsistencyLevel.LOCAL_SERIAL;
+ case LOCAL_ONE: return ConsistencyLevel.LOCAL_ONE;
+ }
+ throw new AssertionError();
+ }
+
// We never return, but returning a RuntimeException allows to write "throw rethrow(e)" without java complaining
// for methods that have a return value.
public static RuntimeException rethrow(RequestExecutionException e) throws UnavailableException, TimedOutException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/src/java/org/apache/cassandra/utils/UUIDGen.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java
index f385744..53293b2 100644
--- a/src/java/org/apache/cassandra/utils/UUIDGen.java
+++ b/src/java/org/apache/cassandra/utils/UUIDGen.java
@@ -25,6 +25,8 @@ import java.util.Collection;
import java.util.Random;
import java.util.UUID;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* The goods are here: www.ietf.org/rfc/rfc4122.txt.
@@ -80,6 +82,12 @@ public class UUIDGen
return new UUID(createTime(fromUnixTimestamp(when)), clockSeqAndNode);
}
+ @VisibleForTesting
+ public static UUID getTimeUUID(long when, long clockSeqAndNode)
+ {
+ return new UUID(createTime(fromUnixTimestamp(when)), clockSeqAndNode);
+ }
+
/** creates a type 1 uuid from raw bytes. */
public static UUID getUUID(ByteBuffer raw)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/bin/cassandra-stress
----------------------------------------------------------------------
diff --git a/tools/bin/cassandra-stress b/tools/bin/cassandra-stress
index 39257cd..c855cf5 100755
--- a/tools/bin/cassandra-stress
+++ b/tools/bin/cassandra-stress
@@ -42,4 +42,4 @@ if [ "x$JAVA" = "x" ]; then
exit 1
fi
-$JAVA -server -cp $CLASSPATH org.apache.cassandra.stress.Stress $@
+$JAVA -server -ea -cp $CLASSPATH org.apache.cassandra.stress.Stress $@
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/cqlstress-counter-example.yaml
----------------------------------------------------------------------
diff --git a/tools/cqlstress-counter-example.yaml b/tools/cqlstress-counter-example.yaml
new file mode 100644
index 0000000..a65080a
--- /dev/null
+++ b/tools/cqlstress-counter-example.yaml
@@ -0,0 +1,85 @@
+#
+# This is an example YAML profile for cassandra-stress
+#
+# insert data
+# cassandra-stress user profile=/home/jake/stress1.yaml ops(insert=1)
+#
+# read, using query simple1:
+# cassandra-stress profile=/home/jake/stress1.yaml ops(simple1=1)
+#
+# mixed workload (90/10)
+# cassandra-stress user profile=/home/jake/stress1.yaml ops(insert=1,simple1=9)
+
+
+#
+# Keyspace info
+#
+keyspace: stresscql
+
+#
+# The CQL for creating a keyspace (optional if it already exists)
+#
+keyspace_definition: |
+ CREATE KEYSPACE stresscql WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+
+#
+# Table info
+#
+table: counttest
+
+#
+# The CQL for creating a table you wish to stress (optional if it already exists)
+#
+table_definition: |
+ CREATE TABLE counttest (
+ name text PRIMARY KEY,
+ count counter
+ ) WITH comment='A table of many types to test wide rows'
+
+#
+# Optional meta information on the generated columns in the above table
+# The min and max only apply to text and blob types
+# The distribution field represents the total unique population
+# distribution of that column across rows. Supported types are
+#
+# EXP(min..max) An exponential distribution over the range [min..max]
+# EXTREME(min..max,shape) An extreme value (Weibull) distribution over the range [min..max]
+# GAUSSIAN(min..max,stdvrng) A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
+# GAUSSIAN(min..max,mean,stdev) A gaussian/normal distribution, with explicitly defined mean and stdev
+# UNIFORM(min..max) A uniform distribution over the range [min, max]
+# FIXED(val) A fixed distribution, always returning the same value
+# Aliases: extr, gauss, normal, norm, weibull
+#
+# If preceded by ~, the distribution is inverted
+#
+# Defaults for all columns are size: uniform(1..256), identity: uniform(1..1024)
+#
+
+columnspec:
+ - name: name
+ clustering: uniform(1..100)
+ size: uniform(1..4)
+ - name: count
+ population: fixed(1)
+
+insert:
+ partitions: fixed(1) # number of unique partitions to update in a single operation
+ # if perbatch < 1, multiple batches will be used but all partitions will
+ # occur in all batches (unless already finished); only the row counts will vary
+ pervisit: fixed(1)/1 # ratio of rows each partition should update in a single visit to the partition,
+ # as a proportion of the total possible for the partition
+ perbatch: fixed(1)/1 # number of rows each partition should update in a single batch statement,
+ # as a proportion of the proportion we are inserting this visit
+ # (i.e. compounds with (and capped by) pervisit)
+ batchtype: UNLOGGED # type of batch to use
+
+#
+# A list of queries you wish to run against the schema
+#
+queries:
+ simple1: select * from counttest where name = ?
+
+#
+# In order to generate data consistently we need something to generate a unique key for this schema profile.
+#
+seed: changing this string changes the generated data. its hashcode is used as the random seed.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/cqlstress-example.yaml
----------------------------------------------------------------------
diff --git a/tools/cqlstress-example.yaml b/tools/cqlstress-example.yaml
new file mode 100644
index 0000000..a997529
--- /dev/null
+++ b/tools/cqlstress-example.yaml
@@ -0,0 +1,99 @@
+#
+# This is an example YAML profile for cassandra-stress
+#
+# insert data
+# cassandra-stress user profile=/home/jake/stress1.yaml ops(insert=1)
+#
+# read, using query simple1:
+# cassandra-stress profile=/home/jake/stress1.yaml ops(simple1=1)
+#
+# mixed workload (90/10)
+# cassandra-stress user profile=/home/jake/stress1.yaml ops(insert=1,simple1=9)
+
+
+#
+# Keyspace info
+#
+keyspace: stresscql
+
+#
+# The CQL for creating a keyspace (optional if it already exists)
+#
+keyspace_definition: |
+ CREATE KEYSPACE stresscql WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+
+#
+# Table info
+#
+table: typestest
+
+#
+# The CQL for creating a table you wish to stress (optional if it already exists)
+#
+table_definition: |
+ CREATE TABLE typestest (
+ name text,
+ choice boolean,
+ date timestamp,
+ address inet,
+ dbl double,
+ lval bigint,
+ ival int,
+ uid timeuuid,
+ value blob,
+ PRIMARY KEY((name,choice), date, address, dbl, lval, ival, uid)
+ ) WITH COMPACT STORAGE
+ AND compaction = { 'class':'LeveledCompactionStrategy' }
+ AND comment='A table of many types to test wide rows'
+
+#
+# Optional meta information on the generated columns in the above table
+# The min and max only apply to text and blob types
+# The distribution field represents the total unique population
+# distribution of that column across rows. Supported types are
+#
+# EXP(min..max) An exponential distribution over the range [min..max]
+# EXTREME(min..max,shape) An extreme value (Weibull) distribution over the range [min..max]
+# GAUSSIAN(min..max,stdvrng) A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
+# GAUSSIAN(min..max,mean,stdev) A gaussian/normal distribution, with explicitly defined mean and stdev
+# UNIFORM(min..max) A uniform distribution over the range [min, max]
+# FIXED(val) A fixed distribution, always returning the same value
+# Aliases: extr, gauss, normal, norm, weibull
+#
+# If preceded by ~, the distribution is inverted
+#
+# Defaults for all columns are size: uniform(1..256), identity: uniform(1..1024)
+#
+columnspec:
+ - name: name
+ size: uniform(1..10)
+ population: uniform(1..1M) # the range of unique values to select for the field (default is 100Billion)
+ - name: choice
+ - name: date
+ cluster: uniform(1..4)
+ - name: lval
+ population: gaussian(1..1000)
+ cluster: uniform(1..4)
+
+insert:
+ partitions: uniform(1..50) # number of unique partitions to update in a single operation
+ # if perbatch < 1, multiple batches will be used but all partitions will
+ # occur in all batches (unless already finished); only the row counts will vary
+ pervisit: uniform(1..10)/10 # ratio of rows each partition should update in a single visit to the partition,
+ # as a proportion of the total possible for the partition
+ perbatch: ~exp(1..3)/4 # number of rows each partition should update in a single batch statement,
+ # as a proportion of the proportion we are inserting this visit
+ # (i.e. compounds with (and capped by) pervisit)
+ batchtype: UNLOGGED # type of batch to use
+
+#
+# A list of queries you wish to run against the schema
+#
+queries:
+ simple1: select * from typestest where name = ? and choice = ? LIMIT 100
+ range1: select * from typestest where name = ? and choice = ? and date >= ? LIMIT 100
+
+#
+# In order to generate data consistently we need something to generate a unique key for this schema profile.
+#
+seed: changing this string changes the generated data. its hashcode is used as the random seed.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/cqlstress-insanity-example.yaml
----------------------------------------------------------------------
diff --git a/tools/cqlstress-insanity-example.yaml b/tools/cqlstress-insanity-example.yaml
new file mode 100644
index 0000000..e94c9c3
--- /dev/null
+++ b/tools/cqlstress-insanity-example.yaml
@@ -0,0 +1,102 @@
+#
+# This is an example YAML profile for cassandra-stress
+#
+# insert data
+# cassandra-stress user profile=/home/jake/stress1.yaml ops(insert=1)
+#
+# read, using query simple1:
+# cassandra-stress profile=/home/jake/stress1.yaml ops(simple1=1)
+#
+# mixed workload (90/10)
+# cassandra-stress user profile=/home/jake/stress1.yaml ops(insert=1,simple1=9)
+
+
+#
+# Keyspace info
+#
+keyspace: stresscql
+
+#
+# The CQL for creating a keyspace (optional if it already exists)
+#
+keyspace_definition: |
+ CREATE KEYSPACE stresscql WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+
+#
+# Table info
+#
+table: insanitytest
+
+#
+# The CQL for creating a table you wish to stress (optional if it already exists)
+#
+table_definition: |
+ CREATE TABLE insanitytest (
+ name text,
+ choice boolean,
+ date timestamp,
+ address inet,
+ dbl double,
+ lval bigint,
+ fval float,
+ ival int,
+ uid timeuuid,
+ dates list<timestamp>,
+ inets set<inet>,
+ value blob,
+ PRIMARY KEY((name, choice), date)
+ ) WITH compaction = { 'class':'LeveledCompactionStrategy' }
+ AND comment='A table of many types to test wide rows and collections'
+
+#
+# Optional meta information on the generated columns in the above table
+# The min and max only apply to text and blob types
+# The distribution field represents the total unique population
+# distribution of that column across rows. Supported types are
+#
+# EXP(min..max) An exponential distribution over the range [min..max]
+# EXTREME(min..max,shape) An extreme value (Weibull) distribution over the range [min..max]
+# GAUSSIAN(min..max,stdvrng) A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
+# GAUSSIAN(min..max,mean,stdev) A gaussian/normal distribution, with explicitly defined mean and stdev
+# UNIFORM(min..max) A uniform distribution over the range [min, max]
+# FIXED(val) A fixed distribution, always returning the same value
+# Aliases: extr, gauss, normal, norm, weibull
+#
+# If preceded by ~, the distribution is inverted
+#
+# Defaults for all columns are size: uniform(1..256), population: uniform(1..100B)
+#
+columnspec:
+ - name: name
+ clustering: uniform(1..4)
+ - name: date
+ clustering: gaussian(1..20)
+ - name: lval
+ population: fixed(1)
+ - name: dates
+ clustering: uniform(1..100)
+ - name: inets
+ clustering: uniform(1..200)
+ - name: value
+
+insert:
+ partitions: fixed(1) # number of unique partitions to update in a single operation
+ # if perbatch < 1, multiple batches will be used but all partitions will
+ # occur in all batches (unless already finished); only the row counts will vary
+ pervisit: uniform(1..10)/100K # ratio of rows each partition should update in a single visit to the partition,
+ # as a proportion of the total possible for the partition
+ perbatch: fixed(1)/1 # number of rows each partition should update in a single batch statement,
+ # as a proportion of the proportion we are inserting this visit
+ # (i.e. compounds with (and capped by) pervisit)
+ batchtype: UNLOGGED # type of batch to use
+
+#
+# A list of queries you wish to run against the schema
+#
+queries:
+ simple1: select * from insanitytest where name = ? and choice = ? LIMIT 100
+
+#
+# In order to generate data consistently we need something to generate a unique key for this schema profile.
+#
+seed: changing this string changes the generated data. its hashcode is used as the random seed.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index 87afb3d..7831074 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -18,203 +18,47 @@
package org.apache.cassandra.stress;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.cassandra.stress.generatedata.Distribution;
-import org.apache.cassandra.stress.generatedata.KeyGen;
-import org.apache.cassandra.stress.generatedata.RowGen;
+import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.cassandra.stress.generate.Partition;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
import org.apache.cassandra.stress.settings.*;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.stress.util.Timer;
-import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.transport.SimpleClient;
-import org.apache.cassandra.utils.ByteBufferUtil;
public abstract class Operation
{
- public final long index;
- protected final State state;
+ public final StressSettings settings;
+ public final Timer timer;
+ public final PartitionGenerator generator;
+ public final Distribution partitionCount;
- public Operation(State state, long idx)
+ protected List<Partition> partitions;
+
+ public Operation(Timer timer, PartitionGenerator generator, StressSettings settings, Distribution partitionCount)
{
- index = idx;
- this.state = state;
+ this.generator = generator;
+ this.timer = timer;
+ this.settings = settings;
+ this.partitionCount = partitionCount;
}
public static interface RunOp
{
public boolean run() throws Exception;
- public String key();
- public int keyCount();
+ public int partitionCount();
+ public int rowCount();
}
- // one per thread!
- public static final class State
+ protected void setPartitions(List<Partition> partitions)
{
-
- public final StressSettings settings;
- public final Timer timer;
- public final Command type;
- public final KeyGen keyGen;
- public final RowGen rowGen;
- public final Distribution counteradd;
- public final List<ColumnParent> columnParents;
- public final StressMetrics metrics;
- public final SettingsCommandMixed.CommandSelector commandSelector;
- private final EnumMap<Command, State> substates;
- private Object cqlCache;
-
- public State(Command type, StressSettings settings, StressMetrics metrics)
- {
- this.type = type;
- this.timer = metrics.getTiming().newTimer();
- if (type == Command.MIXED)
- {
- commandSelector = ((SettingsCommandMixed) settings.command).selector();
- substates = new EnumMap<>(Command.class);
- }
- else
- {
- commandSelector = null;
- substates = null;
- }
- counteradd = settings.command.add.get();
- this.settings = settings;
- this.keyGen = settings.keys.newKeyGen();
- this.rowGen = settings.columns.newRowGen();
- this.metrics = metrics;
- this.columnParents = columnParents(type, settings);
- }
-
- private State(Command type, State copy)
- {
- this.type = type;
- this.timer = copy.timer;
- this.rowGen = copy.rowGen;
- this.keyGen = copy.keyGen;
- this.columnParents = columnParents(type, copy.settings);
- this.metrics = copy.metrics;
- this.settings = copy.settings;
- this.counteradd = copy.counteradd;
- this.substates = null;
- this.commandSelector = null;
- }
-
- private List<ColumnParent> columnParents(Command type, StressSettings settings)
- {
- if (!settings.columns.useSuperColumns)
- return Collections.singletonList(new ColumnParent(type.table));
- else
- {
- ColumnParent[] cp = new ColumnParent[settings.columns.superColumns];
- for (int i = 0 ; i < cp.length ; i++)
- cp[i] = new ColumnParent(type.supertable).setSuper_column(ByteBufferUtil.bytes("S" + i));
- return Arrays.asList(cp);
- }
- }
-
-
-
- public boolean isCql3()
- {
- return settings.mode.cqlVersion == CqlVersion.CQL3;
- }
- public boolean isCql2()
- {
- return settings.mode.cqlVersion == CqlVersion.CQL2;
- }
- public Object getCqlCache()
- {
- return cqlCache;
- }
- public void storeCqlCache(Object val)
- {
- cqlCache = val;
- }
-
- public State substate(Command command)
- {
- assert type == Command.MIXED;
- State substate = substates.get(command);
- if (substate == null)
- {
- substates.put(command, substate = new State(command, this));
- }
- return substate;
- }
-
- }
-
- protected ByteBuffer getKey()
- {
- return state.keyGen.getKeys(1, index).get(0);
- }
-
- protected List<ByteBuffer> getKeys(int count)
- {
- return state.keyGen.getKeys(count, index);
- }
-
- protected List<ByteBuffer> generateColumnValues(ByteBuffer key)
- {
- return state.rowGen.generate(index, key);
- }
-
- private int sliceStart(int count)
- {
- if (count == state.settings.columns.maxColumnsPerKey)
- return 0;
- return 1 + ThreadLocalRandom.current().nextInt(state.settings.columns.maxColumnsPerKey - count);
- }
-
- protected SlicePredicate slicePredicate()
- {
- final SlicePredicate predicate = new SlicePredicate();
- if (state.settings.columns.slice)
- {
- int count = state.rowGen.count(index);
- int start = sliceStart(count);
- predicate.setSlice_range(new SliceRange()
- .setStart(state.settings.columns.names.get(start))
- .setFinish(new byte[] {})
- .setReversed(false)
- .setCount(count)
- );
- }
- else
- predicate.setColumn_names(randomNames());
- return predicate;
- }
-
- protected List<ByteBuffer> randomNames()
- {
- int count = state.rowGen.count(index);
- List<ByteBuffer> src = state.settings.columns.names;
- if (count == src.size())
- return src;
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
- List<ByteBuffer> r = new ArrayList<>();
- int c = 0, o = 0;
- while (c < count && count + o < src.size())
- {
- int leeway = src.size() - (count + o);
- int spreadover = count - c;
- o += Math.round(rnd.nextDouble() * (leeway / (double) spreadover));
- r.add(src.get(o + c++));
- }
- while (c < count)
- r.add(src.get(o + c++));
- return r;
+ this.partitions = partitions;
}
/**
@@ -234,13 +78,13 @@ public abstract class Operation
public void timeWithRetry(RunOp run) throws IOException
{
- state.timer.start();
+ timer.start();
boolean success = false;
String exceptionMessage = null;
int tries = 0;
- for (; tries < state.settings.command.tries; tries++)
+ for (; tries < settings.command.tries; tries++)
{
try
{
@@ -249,7 +93,7 @@ public abstract class Operation
}
catch (Exception e)
{
- switch (state.settings.log.level)
+ switch (settings.log.level)
{
case MINIMAL:
break;
@@ -269,15 +113,13 @@ public abstract class Operation
}
}
- state.timer.stop(run.keyCount());
+ timer.stop(run.partitionCount(), run.rowCount());
if (!success)
{
- error(String.format("Operation [%d] x%d key %s (0x%s) %s%n",
- index,
+ error(String.format("Operation x%d on key(s) %s: %s%n",
tries,
- run.key(),
- ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(run.key())),
+ key(),
(exceptionMessage == null)
? "Data returned was not validated"
: "Error executing: " + exceptionMessage));
@@ -285,6 +127,14 @@ public abstract class Operation
}
+ private String key()
+ {
+ List<String> keys = new ArrayList<>();
+ for (Partition partition : partitions)
+ keys.add(partition.getKeyAsString());
+ return keys.toString();
+ }
+
protected String getExceptionMessage(Exception e)
{
String className = e.getClass().getSimpleName();
@@ -294,9 +144,9 @@ public abstract class Operation
protected void error(String message) throws IOException
{
- if (!state.settings.command.ignoreErrors)
+ if (!settings.command.ignoreErrors)
throw new IOException(message);
- else if (state.settings.log.level.compareTo(SettingsLog.Level.MINIMAL) > 0)
+ else if (settings.log.level.compareTo(SettingsLog.Level.MINIMAL) > 0)
System.err.println(message);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 07ba1d8..2105a72 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
@@ -29,10 +30,15 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.cassandra.stress.operations.*;
+
+import org.apache.cassandra.stress.generate.Partition;
+import org.apache.cassandra.stress.generate.SeedGenerator;
+import org.apache.cassandra.stress.operations.OpDistribution;
+import org.apache.cassandra.stress.operations.OpDistributionFactory;
import org.apache.cassandra.stress.settings.*;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.transport.SimpleClient;
public class StressAction implements Runnable
@@ -52,7 +58,8 @@ public class StressAction implements Runnable
// creating keyspace and column families
settings.maybeCreateKeyspaces();
- warmup(settings.command.type, settings.command);
+ if (!settings.command.noWarmup)
+ warmup(settings.command.getFactory(settings));
output.println("Sleeping 2s...");
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
@@ -61,7 +68,7 @@ public class StressAction implements Runnable
if (settings.rate.auto)
success = runAuto();
else
- success = null != run(settings.command.type, settings.rate.threadCount, settings.command.count, output);
+ success = null != run(settings.command.getFactory(settings), settings.rate.threadCount, settings.command.count, output);
if (success)
output.println("END");
@@ -72,33 +79,18 @@ public class StressAction implements Runnable
}
// type provided separately to support recursive call for mixed command with each command type it is performing
- private void warmup(Command type, SettingsCommand command)
+ private void warmup(OpDistributionFactory operations)
{
// warmup - do 50k iterations; by default hotspot compiles methods after 10k invocations
PrintStream warmupOutput = new PrintStream(new OutputStream() { @Override public void write(int b) throws IOException { } } );
- int iterations;
- switch (type.category)
+ int iterations = 50000 * settings.node.nodes.size();
+ for (OpDistributionFactory single : operations.each())
{
- case BASIC:
- iterations = 50000;
- break;
- case MIXED:
- for (Command subtype : ((SettingsCommandMixed) command).getCommands())
- warmup(subtype, command);
- return;
- case MULTI:
- int keysAtOnce = command.keysAtOnce;
- iterations = Math.min(50000, (int) Math.ceil(500000d / keysAtOnce));
- break;
- default:
- throw new IllegalStateException();
+ // we need to warm up all the nodes in the cluster ideally, but we may not be the only stress instance;
+ // so warm up all the nodes we're speaking to only.
+ output.println(String.format("Warming up %s with %d iterations...", single.desc(), iterations));
+ run(single, 20, iterations, warmupOutput);
}
-
- // we need to warm up all the nodes in the cluster ideally, but we may not be the only stress instance;
- // so warm up all the nodes we're speaking to only.
- iterations *= settings.node.nodes.size();
- output.println(String.format("Warming up %s with %d iterations...", type, iterations));
- run(type, 20, iterations, warmupOutput);
}
// TODO : permit varying more than just thread count
@@ -113,7 +105,7 @@ public class StressAction implements Runnable
{
output.println(String.format("Running with %d threadCount", threadCount));
- StressMetrics result = run(settings.command.type, threadCount, settings.command.count, output);
+ StressMetrics result = run(settings.command.getFactory(settings), threadCount, settings.command.count, output);
if (result == null)
return false;
results.add(result);
@@ -170,13 +162,13 @@ public class StressAction implements Runnable
return improvement / count;
}
- private StressMetrics run(Command type, int threadCount, long opCount, PrintStream output)
+ private StressMetrics run(OpDistributionFactory operations, int threadCount, long opCount, PrintStream output)
{
output.println(String.format("Running %s with %d threads %s",
- type.toString(),
- threadCount,
- opCount > 0 ? " for " + opCount + " iterations" : "until stderr of mean < " + settings.command.targetUncertainty));
+ operations.desc(),
+ threadCount,
+ opCount > 0 ? " for " + opCount + " iterations" : "until stderr of mean < " + settings.command.targetUncertainty));
final WorkQueue workQueue;
if (opCount < 0)
workQueue = new ContinuousWorkQueue(50);
@@ -193,7 +185,7 @@ public class StressAction implements Runnable
final CountDownLatch done = new CountDownLatch(threadCount);
final Consumer[] consumers = new Consumer[threadCount];
for (int i = 0; i < threadCount; i++)
- consumers[i] = new Consumer(type, done, workQueue, metrics, rateLimiter);
+ consumers[i] = new Consumer(operations, done, workQueue, metrics, rateLimiter);
// starting worker threadCount
for (int i = 0; i < threadCount; i++)
@@ -236,18 +228,24 @@ public class StressAction implements Runnable
private class Consumer extends Thread
{
- private final Operation.State state;
+ private final OpDistribution operations;
+ private final StressMetrics metrics;
+ private final Timer timer;
+ private final SeedGenerator seedGenerator;
private final RateLimiter rateLimiter;
private volatile boolean success = true;
private final WorkQueue workQueue;
private final CountDownLatch done;
- public Consumer(Command type, CountDownLatch done, WorkQueue workQueue, StressMetrics metrics, RateLimiter rateLimiter)
+ public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkQueue workQueue, StressMetrics metrics, RateLimiter rateLimiter)
{
this.done = done;
this.rateLimiter = rateLimiter;
this.workQueue = workQueue;
- this.state = new Operation.State(type, settings, metrics);
+ this.metrics = metrics;
+ this.timer = metrics.getTiming().newTimer();
+ this.seedGenerator = settings.keys.newSeedGenerator();
+ this.operations = operations.get(timer);
}
public void run()
@@ -269,63 +267,89 @@ public class StressAction implements Runnable
sclient = settings.getSimpleNativeClient();
break;
case THRIFT:
- tclient = settings.getThriftClient();
- break;
case THRIFT_SMART:
- tclient = settings.getSmartThriftClient();
+ tclient = settings.getThriftClient();
break;
default:
throw new IllegalStateException();
}
- Work work;
- while ( null != (work = workQueue.poll()) )
+ int maxBatchSize = operations.maxBatchSize();
+ Work work = workQueue.poll();
+ Partition[] partitions = new Partition[maxBatchSize];
+ int workDone = 0;
+ while (work != null)
{
- if (rateLimiter != null)
- rateLimiter.acquire(work.count);
+ Operation op = operations.next();
+ op.generator.reset();
+ int batchSize = Math.max(1, (int) op.partitionCount.next());
+ int partitionCount = 0;
- for (int i = 0 ; i < work.count ; i++)
+ while (partitionCount < batchSize)
{
- try
+ int count = Math.min((work.count - workDone), batchSize - partitionCount);
+ for (int i = 0 ; i < count ; i++)
{
- Operation op = createOperation(state, i + work.offset);
- switch (settings.mode.api)
- {
- case JAVA_DRIVER_NATIVE:
- op.run(jclient);
- break;
- case SIMPLE_NATIVE:
- op.run(sclient);
- break;
- case THRIFT:
- case THRIFT_SMART:
- default:
- op.run(tclient);
- }
- } catch (Exception e)
+ long seed = seedGenerator.next(work.offset + workDone + i);
+ partitions[partitionCount + i] = op.generator.generate(seed);
+ }
+ workDone += count;
+ partitionCount += count;
+ if (workDone == work.count)
{
- if (output == null)
+ workDone = 0;
+ work = workQueue.poll();
+ if (work == null)
{
- System.err.println(e.getMessage());
- success = false;
- System.exit(-1);
+ if (partitionCount == 0)
+ return;
+ break;
}
+ if (rateLimiter != null)
+ rateLimiter.acquire(work.count);
+ }
+ }
+
+ op.setPartitions(Arrays.asList(partitions).subList(0, partitionCount));
- e.printStackTrace(output);
+ try
+ {
+ switch (settings.mode.api)
+ {
+ case JAVA_DRIVER_NATIVE:
+ op.run(jclient);
+ break;
+ case SIMPLE_NATIVE:
+ op.run(sclient);
+ break;
+ case THRIFT:
+ case THRIFT_SMART:
+ default:
+ op.run(tclient);
+ }
+ }
+ catch (Exception e)
+ {
+ if (output == null)
+ {
+ System.err.println(e.getMessage());
success = false;
- workQueue.stop();
- state.metrics.cancel();
- return;
+ System.exit(-1);
}
+
+ e.printStackTrace(output);
+ success = false;
+ workQueue.stop();
+ metrics.cancel();
+ return;
}
}
-
}
finally
{
done.countDown();
- state.timer.close();
+ timer.close();
}
}
@@ -443,106 +467,4 @@ public class StressAction implements Runnable
}
- private Operation createOperation(Operation.State state, long index)
- {
- return createOperation(state.type, state, index);
- }
- private Operation createOperation(Command type, Operation.State state, long index)
- {
- switch (type)
- {
- case READ:
- switch(state.settings.mode.style)
- {
- case THRIFT:
- return new ThriftReader(state, index);
- case CQL:
- case CQL_PREPARED:
- return new CqlReader(state, index);
- default:
- throw new UnsupportedOperationException();
- }
-
-
- case COUNTER_READ:
- switch(state.settings.mode.style)
- {
- case THRIFT:
- return new ThriftCounterGetter(state, index);
- case CQL:
- case CQL_PREPARED:
- return new CqlCounterGetter(state, index);
- default:
- throw new UnsupportedOperationException();
- }
-
- case WRITE:
- switch(state.settings.mode.style)
- {
- case THRIFT:
- return new ThriftInserter(state, index);
- case CQL:
- case CQL_PREPARED:
- return new CqlInserter(state, index);
- default:
- throw new UnsupportedOperationException();
- }
-
- case COUNTER_WRITE:
- switch(state.settings.mode.style)
- {
- case THRIFT:
- return new ThriftCounterAdder(state, index);
- case CQL:
- case CQL_PREPARED:
- return new CqlCounterAdder(state, index);
- default:
- throw new UnsupportedOperationException();
- }
-
- case RANGE_SLICE:
- switch(state.settings.mode.style)
- {
- case THRIFT:
- return new ThriftRangeSlicer(state, index);
- case CQL:
- case CQL_PREPARED:
- return new CqlRangeSlicer(state, index);
- default:
- throw new UnsupportedOperationException();
- }
-
- case INDEXED_RANGE_SLICE:
- switch(state.settings.mode.style)
- {
- case THRIFT:
- return new ThriftIndexedRangeSlicer(state, index);
- case CQL:
- case CQL_PREPARED:
- return new CqlIndexedRangeSlicer(state, index);
- default:
- throw new UnsupportedOperationException();
- }
-
- case READ_MULTI:
- switch(state.settings.mode.style)
- {
- case THRIFT:
- return new ThriftMultiGetter(state, index);
- case CQL:
- case CQL_PREPARED:
- return new CqlMultiGetter(state, index);
- default:
- throw new UnsupportedOperationException();
- }
-
- case MIXED:
- Command subcommand = state.commandSelector.next();
- return createOperation(subcommand, state.substate(subcommand), index);
-
- }
-
- throw new UnsupportedOperationException();
- }
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
index 54a1e2c..7e5c1b6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@ -127,20 +127,21 @@ public class StressMetrics
// PRINT FORMATTING
- public static final String HEADFORMAT = "%-10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s";
- public static final String ROWFORMAT = "%-10d,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f";
+ public static final String HEADFORMAT = "%-10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s";
+ public static final String ROWFORMAT = "%-10d,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f";
private static void printHeader(String prefix, PrintStream output)
{
- output.println(prefix + String.format(HEADFORMAT, "ops","op/s", "key/s","mean","med",".95",".99",".999","max","time","stderr"));
+ output.println(prefix + String.format(HEADFORMAT, "partitions","op/s", "pk/s", "row/s","mean","med",".95",".99",".999","max","time","stderr"));
}
private static void printRow(String prefix, TimingInterval interval, TimingInterval total, Uncertainty opRateUncertainty, PrintStream output)
{
output.println(prefix + String.format(ROWFORMAT,
- total.operationCount,
+ total.partitionCount,
interval.realOpRate(),
- interval.keyRate(),
+ interval.partitionRate(),
+ interval.rowRate(),
interval.meanLatency(),
interval.medianLatency(),
interval.rankLatency(0.95f),
@@ -156,9 +157,9 @@ public class StressMetrics
output.println("\n");
output.println("Results:");
TimingInterval history = timing.getHistory();
- output.println(String.format("real op rate : %.0f", history.realOpRate()));
- output.println(String.format("adjusted op rate stderr : %.0f", opRateUncertainty.getUncertainty()));
- output.println(String.format("key rate : %.0f", history.keyRate()));
+ output.println(String.format("op rate : %.0f", history.realOpRate()));
+ output.println(String.format("partition rate : %.0f", history.partitionRate()));
+ output.println(String.format("row rate : %.0f", history.rowRate()));
output.println(String.format("latency mean : %.1f", history.meanLatency()));
output.println(String.format("latency median : %.1f", history.medianLatency()));
output.println(String.format("latency 95th percentile : %.1f", history.rankLatency(.95f)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
new file mode 100644
index 0000000..13f26a2
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -0,0 +1,504 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.stress;
+
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.CreateKeyspaceStatement;
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+import org.apache.cassandra.stress.generate.DistributionFactory;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.RatioDistributionFactory;
+import org.apache.cassandra.stress.generate.values.Booleans;
+import org.apache.cassandra.stress.generate.values.Bytes;
+import org.apache.cassandra.stress.generate.values.Generator;
+import org.apache.cassandra.stress.generate.values.Dates;
+import org.apache.cassandra.stress.generate.values.Doubles;
+import org.apache.cassandra.stress.generate.values.Floats;
+import org.apache.cassandra.stress.generate.values.GeneratorConfig;
+import org.apache.cassandra.stress.generate.values.Inets;
+import org.apache.cassandra.stress.generate.values.Integers;
+import org.apache.cassandra.stress.generate.values.Lists;
+import org.apache.cassandra.stress.generate.values.Longs;
+import org.apache.cassandra.stress.generate.values.Sets;
+import org.apache.cassandra.stress.generate.values.Strings;
+import org.apache.cassandra.stress.generate.values.TimeUUIDs;
+import org.apache.cassandra.stress.generate.values.UUIDs;
+import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
+import org.apache.cassandra.stress.operations.userdefined.SchemaQuery;
+import org.apache.cassandra.stress.settings.OptionDistribution;
+import org.apache.cassandra.stress.settings.OptionRatioDistribution;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.settings.ValidationType;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.ThriftConversion;
+import org.apache.thrift.TException;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
+import org.yaml.snakeyaml.error.YAMLException;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StressProfile implements Serializable
+{
+ private String keyspaceCql;
+ private String tableCql;
+ private String seedStr;
+
+ public String keyspaceName;
+ public String tableName;
+ private Map<String, GeneratorConfig> columnConfigs;
+ private Map<String, String> queries;
+ private Map<String, String> insert;
+
+ transient volatile TableMetadata tableMetaData;
+
+ transient volatile GeneratorFactory generatorFactory;
+
+ transient volatile BatchStatement.Type batchType;
+ transient volatile DistributionFactory partitions;
+ transient volatile RatioDistributionFactory pervisit;
+ transient volatile RatioDistributionFactory perbatch;
+ transient volatile PreparedStatement insertStatement;
+ transient volatile Integer thriftInsertId;
+
+ transient volatile Map<String, PreparedStatement> queryStatements;
+ transient volatile Map<String, Integer> thriftQueryIds;
+
+ private void init(StressYaml yaml) throws RequestValidationException
+ {
+ keyspaceName = yaml.keyspace;
+ keyspaceCql = yaml.keyspace_definition;
+ tableName = yaml.table;
+ tableCql = yaml.table_definition;
+ seedStr = yaml.seed;
+ queries = yaml.queries;
+ insert = yaml.insert;
+
+ assert keyspaceName != null : "keyspace name is required in yaml file";
+ assert tableName != null : "table name is required in yaml file";
+ assert queries != null : "queries map is required in yaml file";
+
+ if (keyspaceCql != null && keyspaceCql.length() > 0)
+ {
+ String name = ((CreateKeyspaceStatement) QueryProcessor.parseStatement(keyspaceCql)).keyspace();
+ assert name.equalsIgnoreCase(keyspaceName) : "Name in keyspace_definition doesn't match keyspace property: '" + name + "' != '" + keyspaceName + "'";
+ }
+ else
+ {
+ keyspaceCql = null;
+ }
+
+ if (tableCql != null && tableCql.length() > 0)
+ {
+ String name = CFMetaData.compile(tableCql, keyspaceName).cfName;
+ assert name.equalsIgnoreCase(tableName) : "Name in table_definition doesn't match table property: '" + name + "' != '" + tableName + "'";
+ }
+ else
+ {
+ tableCql = null;
+ }
+
+ columnConfigs = new HashMap<>();
+ for (Map<String,Object> spec : yaml.columnspec)
+ {
+ lowerCase(spec);
+ String name = (String) spec.remove("name");
+ DistributionFactory population = !spec.containsKey("population") ? null : OptionDistribution.get((String) spec.remove("population"));
+ DistributionFactory size = !spec.containsKey("size") ? null : OptionDistribution.get((String) spec.remove("size"));
+ DistributionFactory clustering = !spec.containsKey("cluster") ? null : OptionDistribution.get((String) spec.remove("cluster"));
+
+ if (!spec.isEmpty())
+ throw new IllegalArgumentException("Unrecognised option(s) in column spec: " + spec);
+ if (name == null)
+ throw new IllegalArgumentException("Missing name argument in column spec");
+
+ GeneratorConfig config = new GeneratorConfig(yaml.seed + name, clustering, size, population);
+ columnConfigs.put(name, config);
+ }
+ }
+
+ public void maybeCreateSchema(StressSettings settings)
+ {
+ JavaDriverClient client = settings.getJavaDriverClient(false);
+
+ if (keyspaceCql != null)
+ {
+ try
+ {
+ client.execute(keyspaceCql, org.apache.cassandra.db.ConsistencyLevel.ONE);
+ }
+ catch (AlreadyExistsException e)
+ {
+ }
+ }
+
+ client.execute("use "+keyspaceName, org.apache.cassandra.db.ConsistencyLevel.ONE);
+
+ if (tableCql != null)
+ {
+ try
+ {
+ client.execute(tableCql, org.apache.cassandra.db.ConsistencyLevel.ONE);
+ }
+ catch (AlreadyExistsException e)
+ {
+ }
+
+ System.out.println(String.format("Created schema. Sleeping %ss for propagation.", settings.node.nodes.size()));
+ Uninterruptibles.sleepUninterruptibly(settings.node.nodes.size(), TimeUnit.SECONDS);
+ }
+
+ maybeLoadSchemaInfo(settings);
+ }
+
+
+ private void maybeLoadSchemaInfo(StressSettings settings)
+ {
+ if (tableMetaData == null)
+ {
+ JavaDriverClient client = settings.getJavaDriverClient();
+
+ synchronized (client)
+ {
+
+ if (tableMetaData != null)
+ return;
+
+ TableMetadata metadata = client.getCluster()
+ .getMetadata()
+ .getKeyspace(keyspaceName)
+ .getTable(tableName);
+
+ //Fill in missing column configs
+ for (ColumnMetadata col : metadata.getColumns())
+ {
+ if (columnConfigs.containsKey(col.getName()))
+ continue;
+
+ columnConfigs.put(col.getName(), new GeneratorConfig(seedStr + col.getName(), null, null, null));
+ }
+
+ tableMetaData = metadata;
+ }
+ }
+ }
+
+ public SchemaQuery getQuery(String name, Timer timer, PartitionGenerator generator, StressSettings settings)
+ {
+ if (queryStatements == null)
+ {
+ synchronized (this)
+ {
+ if (queryStatements == null)
+ {
+ try
+ {
+ JavaDriverClient jclient = settings.getJavaDriverClient();
+ ThriftClient tclient = settings.getThriftClient();
+ Map<String, PreparedStatement> stmts = new HashMap<>();
+ Map<String, Integer> tids = new HashMap<>();
+ for (Map.Entry<String, String> e : queries.entrySet())
+ {
+ stmts.put(e.getKey().toLowerCase(), jclient.prepare(e.getValue()));
+ tids.put(e.getKey().toLowerCase(), tclient.prepare_cql3_query(e.getValue(), Compression.NONE));
+ }
+ thriftQueryIds = tids;
+ queryStatements = stmts;
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ // TODO validation
+ name = name.toLowerCase();
+ return new SchemaQuery(timer, generator, settings, thriftQueryIds.get(name), queryStatements.get(name), ThriftConversion.fromThrift(settings.command.consistencyLevel), ValidationType.NOT_FAIL);
+ }
+
+ public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, StressSettings settings)
+ {
+ if (insertStatement == null)
+ {
+ synchronized (this)
+ {
+ if (insertStatement == null)
+ {
+ maybeLoadSchemaInfo(settings);
+
+ Set<ColumnMetadata> keyColumns = com.google.common.collect.Sets.newHashSet(tableMetaData.getPrimaryKey());
+
+ //Non PK Columns
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("UPDATE \"").append(tableName).append("\" SET ");
+
+ //PK Columns
+ StringBuilder pred = new StringBuilder();
+ pred.append(" WHERE ");
+
+ boolean firstCol = true;
+ boolean firstPred = true;
+ for (ColumnMetadata c : tableMetaData.getColumns())
+ {
+
+ if (keyColumns.contains(c))
+ {
+ if (firstPred)
+ firstPred = false;
+ else
+ pred.append(" AND ");
+
+ pred.append(c.getName()).append(" = ?");
+ }
+ else
+ {
+ if (firstCol)
+ firstCol = false;
+ else
+ sb.append(",");
+
+ sb.append(c.getName()).append(" = ");
+
+ switch (c.getType().getName())
+ {
+ case SET:
+ case LIST:
+ case COUNTER:
+ sb.append(c.getName()).append(" + ?");
+ break;
+ default:
+ sb.append("?");
+ break;
+ }
+ }
+ }
+
+ //Put PK predicates at the end
+ sb.append(pred);
+
+ if (insert == null)
+ insert = new HashMap<>();
+ lowerCase(insert);
+
+ partitions = OptionDistribution.get(!insert.containsKey("partitions") ? "fixed(1)" : insert.remove("partitions"));
+ pervisit = OptionRatioDistribution.get(!insert.containsKey("pervisit") ? "fixed(1)/1" : insert.remove("pervisit"));
+ perbatch = OptionRatioDistribution.get(!insert.containsKey("perbatch") ? "fixed(1)/1" : insert.remove("perbatch"));
+ batchType = !insert.containsKey("batchtype") ? BatchStatement.Type.UNLOGGED : BatchStatement.Type.valueOf(insert.remove("batchtype"));
+ if (!insert.isEmpty())
+ throw new IllegalArgumentException("Unrecognised insert option(s): " + insert);
+
+ if (generator.maxRowCount > 100 * 1000 * 1000)
+ System.err.printf("WARNING: You have defined a schema that permits very large partitions (%.0f max rows (>100M))\n", generator.maxRowCount);
+ if (perbatch.get().max() * pervisit.get().max() * partitions.get().maxValue() * generator.maxRowCount > 100000)
+ System.err.printf("WARNING: You have defined a schema that permits very large batches (%.0f max rows (>100K)). This may OOM this stress client, or the server.\n",
+ perbatch.get().max() * pervisit.get().max() * partitions.get().maxValue() * generator.maxRowCount);
+
+ JavaDriverClient client = settings.getJavaDriverClient();
+ String query = sb.toString();
+ try
+ {
+ thriftInsertId = settings.getThriftClient().prepare_cql3_query(query, Compression.NONE);
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ insertStatement = client.prepare(query);
+ }
+ }
+ }
+
+ return new SchemaInsert(timer, generator, settings, partitions.get(), pervisit.get(), perbatch.get(), thriftInsertId, insertStatement, ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType);
+ }
+
+ public PartitionGenerator newGenerator(StressSettings settings)
+ {
+ if (generatorFactory == null)
+ {
+ synchronized (this)
+ {
+ maybeLoadSchemaInfo(settings);
+ if (generatorFactory == null)
+ generatorFactory = new GeneratorFactory();
+ }
+ }
+
+ return generatorFactory.newGenerator();
+ }
+
+ private class GeneratorFactory
+ {
+ final List<ColumnInfo> partitionKeys = new ArrayList<>();
+ final List<ColumnInfo> clusteringColumns = new ArrayList<>();
+ final List<ColumnInfo> valueColumns = new ArrayList<>();
+
+ private GeneratorFactory()
+ {
+ Set<ColumnMetadata> keyColumns = com.google.common.collect.Sets.newHashSet(tableMetaData.getPrimaryKey());
+
+ for (ColumnMetadata metadata : tableMetaData.getPartitionKey())
+ partitionKeys.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName())));
+ for (ColumnMetadata metadata : tableMetaData.getClusteringColumns())
+ clusteringColumns.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName())));
+ for (ColumnMetadata metadata : tableMetaData.getColumns())
+ if (!keyColumns.contains(metadata))
+ valueColumns.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName())));
+ }
+
+ PartitionGenerator newGenerator()
+ {
+ return new PartitionGenerator(get(partitionKeys), get(clusteringColumns), get(valueColumns));
+ }
+
+ List<Generator> get(List<ColumnInfo> columnInfos)
+ {
+ List<Generator> result = new ArrayList<>();
+ for (ColumnInfo columnInfo : columnInfos)
+ result.add(columnInfo.getGenerator());
+ return result;
+ }
+ }
+
+ static class ColumnInfo
+ {
+ final String name;
+ final DataType type;
+ final GeneratorConfig config;
+
+ ColumnInfo(String name, DataType type, GeneratorConfig config)
+ {
+ this.name = name;
+ this.type = type;
+ this.config = config;
+ }
+
+ Generator getGenerator()
+ {
+ return getGenerator(name, type, config);
+ }
+
+ static Generator getGenerator(final String name, final DataType type, GeneratorConfig config)
+ {
+ switch (type.getName())
+ {
+ case ASCII:
+ case TEXT:
+ case VARCHAR:
+ return new Strings(name, config);
+ case BIGINT:
+ case COUNTER:
+ return new Longs(name, config);
+ case BLOB:
+ return new Bytes(name, config);
+ case BOOLEAN:
+ return new Booleans(name, config);
+ case DECIMAL:
+ case DOUBLE:
+ return new Doubles(name, config);
+ case FLOAT:
+ return new Floats(name, config);
+ case INET:
+ return new Inets(name, config);
+ case INT:
+ case VARINT:
+ return new Integers(name, config);
+ case TIMESTAMP:
+ return new Dates(name, config);
+ case UUID:
+ return new UUIDs(name, config);
+ case TIMEUUID:
+ return new TimeUUIDs(name, config);
+ case SET:
+ return new Sets(name, getGenerator(name, type.getTypeArguments().get(0), config), config);
+ case LIST:
+ return new Lists(name, getGenerator(name, type.getTypeArguments().get(0), config), config);
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+ }
+
+ public static StressProfile load(File file) throws IOError
+ {
+ try
+ {
+ byte[] profileBytes = Files.readAllBytes(Paths.get(file.toURI()));
+
+ Constructor constructor = new Constructor(StressYaml.class);
+
+ Yaml yaml = new Yaml(constructor);
+
+ StressYaml profileYaml = yaml.loadAs(new ByteArrayInputStream(profileBytes), StressYaml.class);
+
+ StressProfile profile = new StressProfile();
+ profile.init(profileYaml);
+
+ return profile;
+ }
+ catch (YAMLException | IOException | RequestValidationException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ static <V> void lowerCase(Map<String, V> map)
+ {
+ List<Map.Entry<String, V>> reinsert = new ArrayList<>();
+ Iterator<Map.Entry<String, V>> iter = map.entrySet().iterator();
+ while (iter.hasNext())
+ {
+ Map.Entry<String, V> e = iter.next();
+ if (!e.getKey().toLowerCase().equalsIgnoreCase(e.getKey()))
+ {
+ reinsert.add(e);
+ iter.remove();
+ }
+ }
+ for (Map.Entry<String, V> e : reinsert)
+ map.put(e.getKey().toLowerCase(), e.getValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
new file mode 100644
index 0000000..e94fa77
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.stress;
+
+import java.util.List;
+import java.util.Map;
+
+public class StressYaml
+{
+ public String seed;
+ public String keyspace;
+ public String keyspace_definition;
+ public String table;
+ public String table_definition;
+
+ public List<Map<String,Object>> columnspec;
+ public Map<String,String> queries;
+ public Map<String,String> insert;
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/generate/Distribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Distribution.java b/tools/stress/src/org/apache/cassandra/stress/generate/Distribution.java
new file mode 100644
index 0000000..4662454
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/Distribution.java
@@ -0,0 +1,57 @@
+package org.apache.cassandra.stress.generate;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.Serializable;
+
+public abstract class Distribution implements Serializable
+{
+
+ public abstract long next();
+ public abstract double nextDouble();
+ public abstract long inverseCumProb(double cumProb);
+ public abstract void setSeed(long seed);
+
+ public long maxValue()
+ {
+ return inverseCumProb(1d);
+ }
+
+ public long minValue()
+ {
+ return inverseCumProb(0d);
+ }
+
+ // approximation of the average; slightly costly to calculate, so should not be invoked frequently
+ public long average()
+ {
+ double sum = 0;
+ int count = 0;
+ for (float d = 0 ; d <= 1.0d ; d += 0.02d)
+ {
+ sum += inverseCumProb(d);
+ count += 1;
+ }
+ return (long) (sum / count);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/generate/DistributionBoundApache.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionBoundApache.java b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionBoundApache.java
new file mode 100644
index 0000000..23ce3e9
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionBoundApache.java
@@ -0,0 +1,84 @@
+package org.apache.cassandra.stress.generate;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import org.apache.commons.math3.distribution.AbstractRealDistribution;
+
+public class DistributionBoundApache extends Distribution
+{
+
+ final AbstractRealDistribution delegate;
+ final long min, max;
+
+ public DistributionBoundApache(AbstractRealDistribution delegate, long min, long max)
+ {
+ this.delegate = delegate;
+ this.min = min;
+ this.max = max;
+ }
+
+ @Override
+ public long next()
+ {
+ return bound(min, max, delegate.sample());
+ }
+
+ public double nextDouble()
+ {
+ return boundDouble(min, max, delegate.sample());
+ }
+
+ @Override
+ public long inverseCumProb(double cumProb)
+ {
+ return bound(min, max, delegate.inverseCumulativeProbability(cumProb));
+ }
+
+ public void setSeed(long seed)
+ {
+ delegate.reseedRandomGenerator(seed);
+ }
+
+ private static long bound(long min, long max, double val)
+ {
+ long r = (long) val;
+ if ((r >= min) & (r <= max))
+ return r;
+ if (r < min)
+ return min;
+ if (r > max)
+ return max;
+ throw new IllegalStateException();
+ }
+
+ private static double boundDouble(long min, long max, double r)
+ {
+ if ((r >= min) & (r <= max))
+ return r;
+ if (r < min)
+ return min;
+ if (r > max)
+ return max;
+ throw new IllegalStateException();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFactory.java
new file mode 100644
index 0000000..d0dfa89
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFactory.java
@@ -0,0 +1,31 @@
+package org.apache.cassandra.stress.generate;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.Serializable;
+
+public interface DistributionFactory extends Serializable
+{
+
+ Distribution get();
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFixed.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFixed.java b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFixed.java
new file mode 100644
index 0000000..bbfb894
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionFixed.java
@@ -0,0 +1,54 @@
+package org.apache.cassandra.stress.generate;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+public class DistributionFixed extends Distribution
+{
+
+ final long key;
+
+ public DistributionFixed(long key)
+ {
+ this.key = key;
+ }
+
+ @Override
+ public long next()
+ {
+ return key;
+ }
+
+ public double nextDouble()
+ {
+ return key;
+ }
+
+ @Override
+ public long inverseCumProb(double cumProb)
+ {
+ return key;
+ }
+
+ public void setSeed(long seed)
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java
new file mode 100644
index 0000000..df52cb8
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java
@@ -0,0 +1,37 @@
+package org.apache.cassandra.stress.generate;
+
+public class DistributionInverted extends Distribution
+{
+
+ final Distribution wrapped;
+ final long min;
+ final long max;
+
+ public DistributionInverted(Distribution wrapped)
+ {
+ this.wrapped = wrapped;
+ this.min = wrapped.minValue();
+ this.max = wrapped.maxValue();
+ }
+
+ public long next()
+ {
+ return max - (wrapped.next() - min);
+ }
+
+ public double nextDouble()
+ {
+ return max - (wrapped.nextDouble() - min);
+ }
+
+ public long inverseCumProb(double cumProb)
+ {
+ return max - (wrapped.inverseCumProb(cumProb) - min);
+ }
+
+ public void setSeed(long seed)
+ {
+ wrapped.setSeed(seed);
+ }
+
+}