You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/09/07 16:38:54 UTC
[11/15] git commit: Improve stress workload realism
Improve stress workload realism
patch by benedict; reviewed by tjake for CASSANDRA-7519
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0580fb2b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0580fb2b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0580fb2b
Branch: refs/heads/trunk
Commit: 0580fb2b7707beaa69019a73a6c53d86fe088a0a
Parents: c6a2c65
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Sun Sep 7 21:18:53 2014 +0700
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Sun Sep 7 21:19:58 2014 +0700
----------------------------------------------------------------------
CHANGES.txt | 2 +-
tools/cqlstress-counter-example.yaml | 20 +-
tools/cqlstress-example.yaml | 25 +-
tools/cqlstress-insanity-example.yaml | 20 +-
.../org/apache/cassandra/stress/Operation.java | 9 +-
.../apache/cassandra/stress/StressAction.java | 169 +++-------
.../apache/cassandra/stress/StressMetrics.java | 26 +-
.../apache/cassandra/stress/StressProfile.java | 75 ++++-
.../org/apache/cassandra/stress/StressYaml.java | 12 +-
.../stress/generate/DistributionInverted.java | 7 +
.../stress/generate/DistributionQuantized.java | 90 +++++
.../cassandra/stress/generate/FasterRandom.java | 116 +++++++
.../cassandra/stress/generate/Partition.java | 327 +++++++++++++++----
.../stress/generate/PartitionGenerator.java | 28 +-
.../stress/generate/RatioDistribution.java | 5 +
.../apache/cassandra/stress/generate/Seed.java | 67 ++++
.../stress/generate/SeedGenerator.java | 29 --
.../cassandra/stress/generate/SeedManager.java | 249 ++++++++++++++
.../stress/generate/SeedRandomGenerator.java | 54 ---
.../stress/generate/SeedSeriesGenerator.java | 42 ---
.../stress/generate/values/Booleans.java | 2 +-
.../cassandra/stress/generate/values/Bytes.java | 9 +-
.../cassandra/stress/generate/values/Dates.java | 3 +-
.../stress/generate/values/Doubles.java | 2 +-
.../stress/generate/values/Floats.java | 2 +-
.../stress/generate/values/Generator.java | 4 +-
.../stress/generate/values/HexBytes.java | 2 +-
.../stress/generate/values/HexStrings.java | 4 +-
.../cassandra/stress/generate/values/Inets.java | 2 +-
.../stress/generate/values/Integers.java | 2 +-
.../cassandra/stress/generate/values/Lists.java | 2 +-
.../cassandra/stress/generate/values/Longs.java | 2 +-
.../cassandra/stress/generate/values/Sets.java | 2 +-
.../stress/generate/values/Strings.java | 12 +-
.../stress/generate/values/TimeUUIDs.java | 2 +-
.../cassandra/stress/generate/values/UUIDs.java | 2 +-
.../operations/predefined/CqlCounterAdder.java | 5 +
.../operations/predefined/CqlInserter.java | 5 +
.../predefined/PredefinedOperation.java | 2 +-
.../predefined/ThriftCounterAdder.java | 5 +
.../operations/predefined/ThriftInserter.java | 5 +
.../operations/userdefined/SchemaInsert.java | 80 +++--
.../operations/userdefined/SchemaQuery.java | 87 ++++-
.../operations/userdefined/SchemaStatement.java | 53 +--
.../cassandra/stress/settings/CliOption.java | 3 +-
.../stress/settings/OptionDistribution.java | 72 +++-
.../settings/OptionRatioDistribution.java | 40 ++-
.../stress/settings/SettingsCommand.java | 14 +-
.../settings/SettingsCommandPreDefined.java | 13 +-
.../SettingsCommandPreDefinedMixed.java | 4 +-
.../stress/settings/SettingsCommandUser.java | 16 +-
.../stress/settings/SettingsErrors.java | 92 ++++++
.../stress/settings/SettingsInsert.java | 103 ++++++
.../cassandra/stress/settings/SettingsKey.java | 153 ---------
.../stress/settings/SettingsPopulation.java | 176 ++++++++++
.../stress/settings/SettingsSchema.java | 17 +-
.../stress/settings/StressSettings.java | 23 +-
.../cassandra/stress/util/DynamicList.java | 259 +++++++++++++++
.../org/apache/cassandra/stress/util/Timer.java | 7 +-
.../apache/cassandra/stress/util/Timing.java | 13 +-
.../cassandra/stress/util/TimingInterval.java | 6 +-
61 files changed, 1955 insertions(+), 724 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 46836bf..e42d9c4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,7 +5,7 @@
* cqlsh: DESCRIBE support for frozen UDTs, tuples (CASSANDRA-7863)
* Avoid exposing internal classes over JMX (CASSANDRA-7879)
* Add null check for keys when freezing collection (CASSANDRA-7869)
-
+ * Improve stress workload realism (CASSANDRA-7519)
2.1.0-rc7
* Add frozen keyword and require UDT to be frozen (CASSANDRA-7857)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/cqlstress-counter-example.yaml
----------------------------------------------------------------------
diff --git a/tools/cqlstress-counter-example.yaml b/tools/cqlstress-counter-example.yaml
index cff14b6..f8f70ea 100644
--- a/tools/cqlstress-counter-example.yaml
+++ b/tools/cqlstress-counter-example.yaml
@@ -62,19 +62,17 @@ columnspec:
population: fixed(1)
insert:
- partitions: fixed(1) # number of unique partitions to update in a single operation
- # if perbatch < 1, multiple batches will be used but all partitions will
- # occur in all batches (unless already finished); only the row counts will vary
- pervisit: fixed(1)/1 # ratio of rows each partition should update in a single visit to the partition,
- # as a proportion of the total possible for the partition
- perbatch: fixed(1)/1 # number of rows each partition should update in a single batch statement,
- # as a proportion of the proportion we are inserting this visit
- # (i.e. compounds with (and capped by) pervisit)
- batchtype: UNLOGGED # type of batch to use
+ partitions: fixed(1) # number of unique partitions to update in a single operation
+ # if batchcount > 1, multiple batches will be used but all partitions will
+ # occur in all batches (unless they finish early); only the row counts will vary
+ batchtype: LOGGED # type of batch to use
+ select: fixed(1)/1 # uniform chance any single generated CQL row will be visited in a partition;
+ # generated for each partition independently, each time we visit it
#
# A list of queries you wish to run against the schema
#
queries:
- simple1: select * from counttest where name = ?
-
+ simple1:
+ cql: select * from counttest where name = ?
+ fields: samerow # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/cqlstress-example.yaml
----------------------------------------------------------------------
diff --git a/tools/cqlstress-example.yaml b/tools/cqlstress-example.yaml
index d5c90a2..4dd5e4a 100644
--- a/tools/cqlstress-example.yaml
+++ b/tools/cqlstress-example.yaml
@@ -69,25 +69,26 @@ columnspec:
size: uniform(1..10)
population: uniform(1..1M) # the range of unique values to select for the field (default is 100Billion)
- name: date
- cluster: uniform(1..4)
+ cluster: uniform(20..40)
- name: lval
population: gaussian(1..1000)
cluster: uniform(1..4)
insert:
- partitions: uniform(1..50) # number of unique partitions to update in a single operation
- # if perbatch < 1, multiple batches will be used but all partitions will
- # occur in all batches (unless already finished); only the row counts will vary
- pervisit: uniform(1..10)/10 # ratio of rows each partition should update in a single visit to the partition,
- # as a proportion of the total possible for the partition
- perbatch: ~exp(1..3)/4 # number of rows each partition should update in a single batch statement,
- # as a proportion of the proportion we are inserting this visit
- # (i.e. compounds with (and capped by) pervisit)
- batchtype: UNLOGGED # type of batch to use
+ partitions: uniform(1..50) # number of unique partitions to update in a single operation
+ # if batchcount > 1, multiple batches will be used but all partitions will
+ # occur in all batches (unless they finish early); only the row counts will vary
+ batchtype: LOGGED # type of batch to use
+ select: uniform(1..10)/10 # uniform chance any single generated CQL row will be visited in a partition;
+ # generated for each partition independently, each time we visit it
#
# A list of queries you wish to run against the schema
#
queries:
- simple1: select * from typestest where name = ? and choice = ? LIMIT 100
- range1: select * from typestest where name = ? and choice = ? and date >= ? LIMIT 100
+ simple1:
+ cql: select * from typestest where name = ? and choice = ? LIMIT 100
+ fields: samerow # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
+ range1:
+ cql: select * from typestest where name = ? and choice = ? and date >= ? LIMIT 100
+ fields: multirow # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/cqlstress-insanity-example.yaml
----------------------------------------------------------------------
diff --git a/tools/cqlstress-insanity-example.yaml b/tools/cqlstress-insanity-example.yaml
index ef1bb3a..ea4f97f 100644
--- a/tools/cqlstress-insanity-example.yaml
+++ b/tools/cqlstress-insanity-example.yaml
@@ -74,19 +74,17 @@ columnspec:
insert:
- partitions: fixed(1) # number of unique partitions to update in a single operation
- # if perbatch < 1, multiple batches will be used but all partitions will
- # occur in all batches (unless already finished); only the row counts will vary
- pervisit: uniform(1..10)/10 # ratio of rows each partition should update in a single visit to the partition,
- # as a proportion of the total possible for the partition
- perbatch: fixed(1)/1 # number of rows each partition should update in a single batch statement,
- # as a proportion of the proportion we are inserting this visit
- # (i.e. compounds with (and capped by) pervisit)
- batchtype: UNLOGGED # type of batch to use
+ partitions: fixed(1) # number of unique partitions to update in a single operation
+ # if batchcount > 1, multiple batches will be used but all partitions will
+ # occur in all batches (unless they finish early); only the row counts will vary
+ batchtype: LOGGED # type of batch to use
+ select: fixed(1)/1 # uniform chance any single generated CQL row will be visited in a partition;
+ # generated for each partition independently, each time we visit it
#
# A list of queries you wish to run against the schema
#
queries:
- simple1: select * from insanitytest where name = ? and choice = ? LIMIT 100
-
+ simple1:
+ cql: select * from insanitytest where name = ? and choice = ? LIMIT 100
+ fields: samerow # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index 7831074..5560240 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -61,6 +61,11 @@ public abstract class Operation
this.partitions = partitions;
}
+ public boolean isWrite()
+ {
+ return false;
+ }
+
/**
* Run operation
* @param client Cassandra Thrift client connection
@@ -84,7 +89,7 @@ public abstract class Operation
String exceptionMessage = null;
int tries = 0;
- for (; tries < settings.command.tries; tries++)
+ for (; tries < settings.errors.tries; tries++)
{
try
{
@@ -144,7 +149,7 @@ public abstract class Operation
protected void error(String message) throws IOException
{
- if (!settings.command.ignoreErrors)
+ if (!settings.errors.ignore)
throw new IOException(message);
else if (settings.log.level.compareTo(SettingsLog.Level.MINIMAL) > 0)
System.err.println(message);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 2105a72..e58bfa1 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -23,7 +23,6 @@ import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -32,7 +31,6 @@ import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.stress.generate.Partition;
-import org.apache.cassandra.stress.generate.SeedGenerator;
import org.apache.cassandra.stress.operations.OpDistribution;
import org.apache.cassandra.stress.operations.OpDistributionFactory;
import org.apache.cassandra.stress.settings.*;
@@ -58,6 +56,7 @@ public class StressAction implements Runnable
// creating keyspace and column families
settings.maybeCreateKeyspaces();
+ // TODO: warmup should
if (!settings.command.noWarmup)
warmup(settings.command.getFactory(settings));
@@ -155,8 +154,8 @@ public class StressAction implements Runnable
double improvement = 0;
for (int i = results.size() - count ; i < results.size() ; i++)
{
- double prev = results.get(i - 1).getTiming().getHistory().realOpRate();
- double cur = results.get(i).getTiming().getHistory().realOpRate();
+ double prev = results.get(i - 1).getTiming().getHistory().opRate();
+ double cur = results.get(i).getTiming().getHistory().opRate();
improvement += (cur - prev) / prev;
}
return improvement / count;
@@ -169,11 +168,11 @@ public class StressAction implements Runnable
operations.desc(),
threadCount,
opCount > 0 ? " for " + opCount + " iterations" : "until stderr of mean < " + settings.command.targetUncertainty));
- final WorkQueue workQueue;
+ final WorkManager workManager;
if (opCount < 0)
- workQueue = new ContinuousWorkQueue(50);
+ workManager = new ContinuousWorkManager();
else
- workQueue = FixedWorkQueue.build(opCount);
+ workManager = new FixedWorkManager(opCount);
RateLimiter rateLimiter = null;
// TODO : move this to a new queue wrapper that gates progress based on a poisson (or configurable) distribution
@@ -185,7 +184,7 @@ public class StressAction implements Runnable
final CountDownLatch done = new CountDownLatch(threadCount);
final Consumer[] consumers = new Consumer[threadCount];
for (int i = 0; i < threadCount; i++)
- consumers[i] = new Consumer(operations, done, workQueue, metrics, rateLimiter);
+ consumers[i] = new Consumer(operations, done, workManager, metrics, rateLimiter);
// starting worker threadCount
for (int i = 0; i < threadCount; i++)
@@ -201,14 +200,15 @@ public class StressAction implements Runnable
settings.command.minimumUncertaintyMeasurements,
settings.command.maximumUncertaintyMeasurements);
} catch (InterruptedException e) { }
- workQueue.stop();
+ workManager.stop();
}
try
{
done.await();
metrics.stop();
- } catch (InterruptedException e) {}
+ }
+ catch (InterruptedException e) {}
if (metrics.wasCancelled())
return null;
@@ -231,20 +231,18 @@ public class StressAction implements Runnable
private final OpDistribution operations;
private final StressMetrics metrics;
private final Timer timer;
- private final SeedGenerator seedGenerator;
private final RateLimiter rateLimiter;
private volatile boolean success = true;
- private final WorkQueue workQueue;
+ private final WorkManager workManager;
private final CountDownLatch done;
- public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkQueue workQueue, StressMetrics metrics, RateLimiter rateLimiter)
+ public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkManager workManager, StressMetrics metrics, RateLimiter rateLimiter)
{
this.done = done;
this.rateLimiter = rateLimiter;
- this.workQueue = workQueue;
+ this.workManager = workManager;
this.metrics = metrics;
this.timer = metrics.getTiming().newTimer();
- this.seedGenerator = settings.keys.newSeedGenerator();
this.operations = operations.get(timer);
}
@@ -275,42 +273,33 @@ public class StressAction implements Runnable
}
int maxBatchSize = operations.maxBatchSize();
- Work work = workQueue.poll();
Partition[] partitions = new Partition[maxBatchSize];
- int workDone = 0;
- while (work != null)
+ while (true)
{
+ // TODO: Operation should be able to ecapsulate much of this behaviour
Operation op = operations.next();
op.generator.reset();
- int batchSize = Math.max(1, (int) op.partitionCount.next());
- int partitionCount = 0;
+ int batchSize = workManager.takePermits(Math.max(1, (int) op.partitionCount.next()));
+ if (batchSize < 0)
+ break;
+
+ if (rateLimiter != null)
+ rateLimiter.acquire(batchSize);
+
+ int partitionCount = 0;
while (partitionCount < batchSize)
{
- int count = Math.min((work.count - workDone), batchSize - partitionCount);
- for (int i = 0 ; i < count ; i++)
- {
- long seed = seedGenerator.next(work.offset + workDone + i);
- partitions[partitionCount + i] = op.generator.generate(seed);
- }
- workDone += count;
- partitionCount += count;
- if (workDone == work.count)
- {
- workDone = 0;
- work = workQueue.poll();
- if (work == null)
- {
- if (partitionCount == 0)
- return;
- break;
- }
- if (rateLimiter != null)
- rateLimiter.acquire(work.count);
- }
+ Partition p = op.generator.generate(op);
+ if (p == null)
+ break;
+ partitions[partitionCount++] = p;
}
+ if (partitionCount == 0)
+ break;
+
op.setPartitions(Arrays.asList(partitions).subList(0, partitionCount));
try
@@ -340,7 +329,7 @@ public class StressAction implements Runnable
e.printStackTrace(output);
success = false;
- workQueue.stop();
+ workManager.stop();
metrics.cancel();
return;
}
@@ -356,107 +345,58 @@ public class StressAction implements Runnable
}
- private interface WorkQueue
+ private interface WorkManager
{
- // null indicates consumer should terminate
- Work poll();
+ // -1 indicates consumer should terminate
+ int takePermits(int count);
// signal all consumers to terminate
void stop();
}
- private static final class Work
- {
- // index of operations
- final long offset;
-
- // how many operations to perform
- final int count;
-
- public Work(long offset, int count)
- {
- this.offset = offset;
- this.count = count;
- }
- }
-
- private static final class FixedWorkQueue implements WorkQueue
+ private static final class FixedWorkManager implements WorkManager
{
- final ArrayBlockingQueue<Work> work;
- volatile boolean stop = false;
+ final AtomicLong permits;
- public FixedWorkQueue(ArrayBlockingQueue<Work> work)
+ public FixedWorkManager(long permits)
{
- this.work = work;
+ this.permits = new AtomicLong(permits);
}
@Override
- public Work poll()
+ public int takePermits(int count)
{
- if (stop)
- return null;
- return work.poll();
+ while (true)
+ {
+ long cur = permits.get();
+ if (cur == 0)
+ return -1;
+ count = (int) Math.min(count, cur);
+ long next = cur - count;
+ if (permits.compareAndSet(cur, next))
+ return count;
+ }
}
@Override
public void stop()
{
- stop = true;
+ permits.getAndSet(0);
}
-
- static FixedWorkQueue build(long operations)
- {
- // target splitting into around 50-500k items, with a minimum size of 20
- if (operations > Integer.MAX_VALUE * (1L << 19))
- throw new IllegalStateException("Cannot currently support more than approx 2^50 operations for one stress run. This is a LOT.");
- int batchSize = (int) (operations / (1 << 19));
- if (batchSize < 20)
- batchSize = 20;
- ArrayBlockingQueue<Work> work = new ArrayBlockingQueue<>(
- (int) ((operations / batchSize)
- + (operations % batchSize == 0 ? 0 : 1))
- );
- long offset = 0;
- while (offset < operations)
- {
- work.add(new Work(offset, (int) Math.min(batchSize, operations - offset)));
- offset += batchSize;
- }
- return new FixedWorkQueue(work);
- }
-
}
- private static final class ContinuousWorkQueue implements WorkQueue
+ private static final class ContinuousWorkManager implements WorkManager
{
- final AtomicLong offset = new AtomicLong();
- final int batchSize;
volatile boolean stop = false;
- private ContinuousWorkQueue(int batchSize)
- {
- this.batchSize = batchSize;
- }
-
@Override
- public Work poll()
+ public int takePermits(int count)
{
if (stop)
- return null;
- return new Work(nextOffset(), batchSize);
- }
-
- private long nextOffset()
- {
- final int inc = batchSize;
- while (true)
- {
- final long cur = offset.get();
- if (offset.compareAndSet(cur, cur + inc))
- return cur;
- }
+ return -1;
+ return count;
}
@Override
@@ -466,5 +406,4 @@ public class StressAction implements Runnable
}
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
index 7e5c1b6..a9edfc6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@ -42,7 +42,7 @@ public class StressMetrics
private final Thread thread;
private volatile boolean stop = false;
private volatile boolean cancelled = false;
- private final Uncertainty opRateUncertainty = new Uncertainty();
+ private final Uncertainty rowRateUncertainty = new Uncertainty();
private final CountDownLatch stopped = new CountDownLatch(1);
private final Timing timing = new Timing();
@@ -68,6 +68,7 @@ public class StressMetrics
Thread.sleep(logIntervalMillis);
else
Thread.sleep(sleep);
+
update();
} catch (InterruptedException e)
{
@@ -86,6 +87,7 @@ public class StressMetrics
}
finally
{
+ rowRateUncertainty.wakeAll();
stopped.countDown();
}
}
@@ -99,7 +101,7 @@ public class StressMetrics
public void waitUntilConverges(double targetUncertainty, int minMeasurements, int maxMeasurements) throws InterruptedException
{
- opRateUncertainty.await(targetUncertainty, minMeasurements, maxMeasurements);
+ rowRateUncertainty.await(targetUncertainty, minMeasurements, maxMeasurements);
}
public void cancel()
@@ -107,7 +109,7 @@ public class StressMetrics
cancelled = true;
stop = true;
thread.interrupt();
- opRateUncertainty.wakeAll();
+ rowRateUncertainty.wakeAll();
}
public void stop() throws InterruptedException
@@ -120,8 +122,11 @@ public class StressMetrics
private void update() throws InterruptedException
{
TimingInterval interval = timing.snapInterval();
- printRow("", interval, timing.getHistory(), opRateUncertainty, output);
- opRateUncertainty.update(interval.adjustedOpRate());
+ if (interval.partitionCount != 0)
+ printRow("", interval, timing.getHistory(), rowRateUncertainty, output);
+ rowRateUncertainty.update(interval.adjustedRowRate());
+ if (timing.done())
+ stop = true;
}
@@ -132,14 +137,15 @@ public class StressMetrics
private static void printHeader(String prefix, PrintStream output)
{
- output.println(prefix + String.format(HEADFORMAT, "partitions","op/s", "pk/s", "row/s","mean","med",".95",".99",".999","max","time","stderr"));
+ output.println(prefix + String.format(HEADFORMAT, "total ops","adj row/s","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr"));
}
private static void printRow(String prefix, TimingInterval interval, TimingInterval total, Uncertainty opRateUncertainty, PrintStream output)
{
output.println(prefix + String.format(ROWFORMAT,
- total.partitionCount,
- interval.realOpRate(),
+ total.operationCount,
+ interval.adjustedRowRate(),
+ interval.opRate(),
interval.partitionRate(),
interval.rowRate(),
interval.meanLatency(),
@@ -157,7 +163,7 @@ public class StressMetrics
output.println("\n");
output.println("Results:");
TimingInterval history = timing.getHistory();
- output.println(String.format("op rate : %.0f", history.realOpRate()));
+ output.println(String.format("op rate : %.0f", history.opRate()));
output.println(String.format("partition rate : %.0f", history.partitionRate()));
output.println(String.format("row rate : %.0f", history.rowRate()));
output.println(String.format("latency mean : %.1f", history.meanLatency()));
@@ -181,7 +187,7 @@ public class StressMetrics
printRow(String.format(formatstr, ids.get(i)),
summarise.get(i).timing.getHistory(),
summarise.get(i).timing.getHistory(),
- summarise.get(i).opRateUncertainty,
+ summarise.get(i).rowRateUncertainty,
out
);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 4e09775..de561f3 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -24,15 +24,18 @@ package org.apache.cassandra.stress;
import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import com.google.common.base.Function;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.statements.CreateKeyspaceStatement;
import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.stress.generate.Distribution;
import org.apache.cassandra.stress.generate.DistributionFactory;
import org.apache.cassandra.stress.generate.PartitionGenerator;
import org.apache.cassandra.stress.generate.RatioDistributionFactory;
+import org.apache.cassandra.stress.generate.SeedManager;
import org.apache.cassandra.stress.generate.values.Booleans;
import org.apache.cassandra.stress.generate.values.Bytes;
import org.apache.cassandra.stress.generate.values.Generator;
@@ -88,7 +91,7 @@ public class StressProfile implements Serializable
public String keyspaceName;
public String tableName;
private Map<String, GeneratorConfig> columnConfigs;
- private Map<String, String> queries;
+ private Map<String, StressYaml.QueryDef> queries;
private Map<String, String> insert;
transient volatile TableMetadata tableMetaData;
@@ -97,11 +100,11 @@ public class StressProfile implements Serializable
transient volatile BatchStatement.Type batchType;
transient volatile DistributionFactory partitions;
- transient volatile RatioDistributionFactory pervisit;
- transient volatile RatioDistributionFactory perbatch;
+ transient volatile RatioDistributionFactory selectchance;
transient volatile PreparedStatement insertStatement;
transient volatile Integer thriftInsertId;
+ transient volatile Map<String, SchemaQuery.ArgSelect> argSelects;
transient volatile Map<String, PreparedStatement> queryStatements;
transient volatile Map<String, Integer> thriftQueryIds;
@@ -242,13 +245,18 @@ public class StressProfile implements Serializable
ThriftClient tclient = settings.getThriftClient();
Map<String, PreparedStatement> stmts = new HashMap<>();
Map<String, Integer> tids = new HashMap<>();
- for (Map.Entry<String, String> e : queries.entrySet())
+ Map<String, SchemaQuery.ArgSelect> args = new HashMap<>();
+ for (Map.Entry<String, StressYaml.QueryDef> e : queries.entrySet())
{
- stmts.put(e.getKey().toLowerCase(), jclient.prepare(e.getValue()));
- tids.put(e.getKey().toLowerCase(), tclient.prepare_cql3_query(e.getValue(), Compression.NONE));
+ stmts.put(e.getKey().toLowerCase(), jclient.prepare(e.getValue().cql));
+ tids.put(e.getKey().toLowerCase(), tclient.prepare_cql3_query(e.getValue().cql, Compression.NONE));
+ args.put(e.getKey().toLowerCase(), e.getValue().fields == null
+ ? SchemaQuery.ArgSelect.MULTIROW
+ : SchemaQuery.ArgSelect.valueOf(e.getValue().fields.toUpperCase()));
}
thriftQueryIds = tids;
queryStatements = stmts;
+ argSelects = args;
}
catch (TException e)
{
@@ -260,7 +268,9 @@ public class StressProfile implements Serializable
// TODO validation
name = name.toLowerCase();
- return new SchemaQuery(timer, generator, settings, thriftQueryIds.get(name), queryStatements.get(name), ThriftConversion.fromThrift(settings.command.consistencyLevel), ValidationType.NOT_FAIL);
+ if (!queryStatements.containsKey(name))
+ throw new IllegalArgumentException("No query defined with name " + name);
+ return new SchemaQuery(timer, generator, settings, thriftQueryIds.get(name), queryStatements.get(name), ThriftConversion.fromThrift(settings.command.consistencyLevel), ValidationType.NOT_FAIL, argSelects.get(name));
}
public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, StressSettings settings)
@@ -328,18 +338,37 @@ public class StressProfile implements Serializable
insert = new HashMap<>();
lowerCase(insert);
- partitions = OptionDistribution.get(!insert.containsKey("partitions") ? "fixed(1)" : insert.remove("partitions"));
- pervisit = OptionRatioDistribution.get(!insert.containsKey("pervisit") ? "fixed(1)/1" : insert.remove("pervisit"));
- perbatch = OptionRatioDistribution.get(!insert.containsKey("perbatch") ? "fixed(1)/1" : insert.remove("perbatch"));
- batchType = !insert.containsKey("batchtype") ? BatchStatement.Type.LOGGED : BatchStatement.Type.valueOf(insert.remove("batchtype"));
+ partitions = select(settings.insert.batchsize, "partitions", "fixed(1)", insert, OptionDistribution.BUILDER);
+ selectchance = select(settings.insert.selectRatio, "select", "fixed(1)/1", insert, OptionRatioDistribution.BUILDER);
+ batchType = settings.insert.batchType != null
+ ? settings.insert.batchType
+ : !insert.containsKey("batchtype")
+ ? BatchStatement.Type.LOGGED
+ : BatchStatement.Type.valueOf(insert.remove("batchtype"));
if (!insert.isEmpty())
throw new IllegalArgumentException("Unrecognised insert option(s): " + insert);
+ Distribution visits = settings.insert.visits.get();
+ // these min/max are not absolutely accurate if selectchance < 1, but they're close enough to
+ // guarantee the vast majority of actions occur in these bounds
+ double minBatchSize = selectchance.get().min() * partitions.get().minValue() * generator.minRowCount * (1d / visits.maxValue());
+ double maxBatchSize = selectchance.get().max() * partitions.get().maxValue() * generator.maxRowCount * (1d / visits.minValue());
+ System.out.printf("Generating batches with [%d..%d] partitions and [%.0f..%.0f] rows (of [%.0f..%.0f] total rows in the partitions)\n",
+ partitions.get().minValue(), partitions.get().maxValue(),
+ minBatchSize, maxBatchSize,
+ partitions.get().minValue() * generator.minRowCount,
+ partitions.get().maxValue() * generator.maxRowCount);
if (generator.maxRowCount > 100 * 1000 * 1000)
System.err.printf("WARNING: You have defined a schema that permits very large partitions (%.0f max rows (>100M))\n", generator.maxRowCount);
- if (perbatch.get().max() * pervisit.get().max() * partitions.get().maxValue() * generator.maxRowCount > 100000)
+ if (batchType == BatchStatement.Type.LOGGED && maxBatchSize > 65535)
+ {
+ System.err.printf("ERROR: You have defined a workload that generates batches with more than 65k rows (%.0f), but have required the use of LOGGED batches. There is a 65k row limit on a single batch.\n",
+ selectchance.get().max() * partitions.get().maxValue() * generator.maxRowCount);
+ System.exit(1);
+ }
+ if (maxBatchSize > 100000)
System.err.printf("WARNING: You have defined a schema that permits very large batches (%.0f max rows (>100K)). This may OOM this stress client, or the server.\n",
- perbatch.get().max() * pervisit.get().max() * partitions.get().maxValue() * generator.maxRowCount);
+ selectchance.get().max() * partitions.get().maxValue() * generator.maxRowCount);
JavaDriverClient client = settings.getJavaDriverClient();
String query = sb.toString();
@@ -356,10 +385,20 @@ public class StressProfile implements Serializable
}
}
- return new SchemaInsert(timer, generator, settings, partitions.get(), pervisit.get(), perbatch.get(), thriftInsertId, insertStatement, ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType);
+ return new SchemaInsert(timer, generator, settings, partitions.get(), selectchance.get(), thriftInsertId, insertStatement, ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType);
+ }
+
+ private static <E> E select(E first, String key, String defValue, Map<String, String> map, Function<String, E> builder)
+ {
+ String val = map.remove(key);
+ if (first != null)
+ return first;
+ if (val != null)
+ return builder.apply(val);
+ return builder.apply(defValue);
}
- public PartitionGenerator newGenerator(StressSettings settings)
+ public PartitionGenerator newGenerator(StressSettings settings, SeedManager seeds)
{
if (generatorFactory == null)
{
@@ -371,7 +410,7 @@ public class StressProfile implements Serializable
}
}
- return generatorFactory.newGenerator();
+ return generatorFactory.newGenerator(settings, seeds);
}
private class GeneratorFactory
@@ -393,9 +432,9 @@ public class StressProfile implements Serializable
valueColumns.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName())));
}
- PartitionGenerator newGenerator()
+ PartitionGenerator newGenerator(StressSettings settings, SeedManager seeds)
{
- return new PartitionGenerator(get(partitionKeys), get(clusteringColumns), get(valueColumns));
+ return new PartitionGenerator(get(partitionKeys), get(clusteringColumns), get(valueColumns), settings.generate.order, seeds);
}
List<Generator> get(List<ColumnInfo> columnInfos)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
index deea1fb..b6efc5e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
@@ -30,8 +30,14 @@ public class StressYaml
public String table;
public String table_definition;
- public List<Map<String,Object>> columnspec;
- public Map<String,String> queries;
- public Map<String,String> insert;
+ public List<Map<String, Object>> columnspec;
+ public Map<String, QueryDef> queries;
+ public Map<String, String> insert;
+
+ public static class QueryDef
+ {
+ public String cql;
+ public String fields;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java
index 13fae0d..4062b58 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java
@@ -55,4 +55,11 @@ public class DistributionInverted extends Distribution
wrapped.setSeed(seed);
}
+ public static Distribution invert(Distribution distribution)
+ {
+ if (distribution instanceof DistributionInverted)
+ return ((DistributionInverted) distribution).wrapped;
+ return new DistributionInverted(distribution);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionQuantized.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionQuantized.java b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionQuantized.java
new file mode 100644
index 0000000..9771134
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionQuantized.java
@@ -0,0 +1,90 @@
+package org.apache.cassandra.stress.generate;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.cassandra.stress.Stress;
+
+public class DistributionQuantized extends Distribution
+{
+
+ final Distribution delegate;
+ final long[] bounds;
+ final Random random = new Random();
+
+ public DistributionQuantized(Distribution delegate, int quantas)
+ {
+ this.delegate = delegate;
+ this.bounds = new long[quantas + 1];
+ bounds[0] = delegate.minValue();
+ bounds[quantas] = delegate.maxValue() + 1;
+ for (int i = 1 ; i < quantas ; i++)
+ bounds[i] = delegate.inverseCumProb(i / (double) quantas);
+ }
+
+ @Override
+ public long next()
+ {
+ int quanta = quanta(delegate.next());
+ return bounds[quanta] + (long) (random.nextDouble() * ((bounds[quanta + 1] - bounds[quanta])));
+ }
+
+ public double nextDouble()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long inverseCumProb(double cumProb)
+ {
+ long val = delegate.inverseCumProb(cumProb);
+ int quanta = quanta(val);
+ if (quanta < 0)
+ return bounds[0];
+ if (quanta >= bounds.length - 1)
+ return bounds[bounds.length - 1] - 1;
+ cumProb -= (quanta / ((double) bounds.length - 1));
+ cumProb *= (double) bounds.length - 1;
+ return bounds[quanta] + (long) (cumProb * (bounds[quanta + 1] - bounds[quanta]));
+ }
+
+ int quanta(long val)
+ {
+ int i = Arrays.binarySearch(bounds, val);
+ if (i < 0)
+ return -2 -i;
+ return i - 1;
+ }
+
+ public void setSeed(long seed)
+ {
+ delegate.setSeed(seed);
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ Stress.main(new String[] { "print", "dist=qextreme(1..1M,2,2)"});
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java b/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java
new file mode 100644
index 0000000..455fec4
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java
@@ -0,0 +1,116 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.stress.generate;
+
+import java.util.Random;
+
+import org.apache.commons.math3.random.RandomGenerator;
+
+// based on http://en.wikipedia.org/wiki/Xorshift, but periodically we reseed with our stronger random generator
+// note it is also non-atomically updated, so expects to be used by a single thread
+public class FasterRandom implements RandomGenerator
+{
+ final Random random = new Random();
+
+ private long seed;
+ private int reseed;
+
+ public void setSeed(int seed)
+ {
+ setSeed((long) seed);
+ }
+
+ public void setSeed(int[] ints)
+ {
+ if (ints.length > 1)
+ setSeed (((long) ints[0] << 32) | ints[1]);
+ else
+ setSeed(ints[0]);
+ }
+
+ public void setSeed(long seed)
+ {
+ this.seed = seed;
+ rollover();
+ }
+
+ private void rollover()
+ {
+ this.reseed = 0;
+ random.setSeed(seed);
+ seed = random.nextLong();
+ }
+
+ public void nextBytes(byte[] bytes)
+ {
+ int i = 0;
+ while (i < bytes.length)
+ {
+ long next = nextLong();
+ while (i < bytes.length)
+ {
+ bytes[i++] = (byte) (next & 0xFF);
+ next >>>= 8;
+ }
+ }
+ }
+
+ public int nextInt()
+ {
+ return (int) nextLong();
+ }
+
+ public int nextInt(int i)
+ {
+ return Math.abs((int) nextLong() % i);
+ }
+
+ public long nextLong()
+ {
+ if (++this.reseed == 32)
+ rollover();
+
+ long seed = this.seed;
+ seed ^= seed >> 12;
+ seed ^= seed << 25;
+ seed ^= seed >> 27;
+ this.seed = seed;
+ return seed * 2685821657736338717L;
+ }
+
+ public boolean nextBoolean()
+ {
+ return ((int) nextLong() & 1) == 1;
+ }
+
+ public float nextFloat()
+ {
+ return Float.intBitsToFloat((int) nextLong());
+ }
+
+ public double nextDouble()
+ {
+ return Double.longBitsToDouble(nextLong());
+ }
+
+ public double nextGaussian()
+ {
+ return random.nextGaussian();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java b/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java
index f05e95b..18f5732 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java
@@ -23,24 +23,34 @@ package org.apache.cassandra.stress.generate;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.stress.generate.values.Generator;
// a partition is re-used to reduce garbage generation, as is its internal RowIterator
+// TODO: we should batch the generation of clustering components so we can bound the time and size necessary to
+// generate huge partitions with only a small number of clustering components; i.e. we should generate seeds for batches
+// of a single component, and then generate the values within those batches as necessary. this will be difficult with
+// generating sorted partitions, and may require generator support (e.g. we may need to support generating prefixes
+// that are extended/suffixed to generate each batch, so that we can sort the prefixes)
public class Partition
{
private long idseed;
+ private Seed seed;
private final Object[] partitionKey;
private final PartitionGenerator generator;
private final RowIterator iterator;
@@ -55,31 +65,32 @@ public class Partition
iterator = new SingleRowIterator();
}
- void setSeed(long seed)
+ void setSeed(Seed seed)
{
long idseed = 0;
for (int i = 0 ; i < partitionKey.length ; i++)
{
Generator generator = this.generator.partitionKey.get(i);
// set the partition key seed based on the current work item we're processing
- generator.setSeed(seed);
+ generator.setSeed(seed.seed);
Object key = generator.generate();
partitionKey[i] = key;
// then contribute this value to the data seed
idseed = seed(key, generator.type, idseed);
}
+ this.seed = seed;
this.idseed = idseed;
}
- public RowIterator iterator(double useChance)
+ public RowIterator iterator(double useChance, boolean isWrite)
{
- iterator.reset(useChance, 0);
+ iterator.reset(useChance, 0, 1, isWrite);
return iterator;
}
- public RowIterator iterator(int targetCount)
+ public RowIterator iterator(int targetCount, boolean isWrite)
{
- iterator.reset(Double.NaN, targetCount);
+ iterator.reset(Double.NaN, targetCount, 1, isWrite);
return iterator;
}
@@ -87,12 +98,12 @@ public class Partition
{
boolean done;
- void reset(double useChance, int targetCount)
+ void reset(double useChance, int targetCount, int batches, boolean isWrite)
{
done = false;
}
- public Iterable<Row> batch(double ratio)
+ public Iterable<Row> next()
{
if (done)
return Collections.emptyList();
@@ -110,6 +121,12 @@ public class Partition
{
return done;
}
+
+ public void markWriteFinished()
+ {
+ assert done;
+ generator.seeds.markFinished(seed);
+ }
}
public abstract class RowIterator
@@ -117,10 +134,10 @@ public class Partition
// we reuse the row object to save garbage
final Row row = new Row(partitionKey, new Object[generator.clusteringComponents.size() + generator.valueComponents.size()]);
- public abstract Iterable<Row> batch(double ratio);
- abstract void reset(double useChance, int targetCount);
-
+ public abstract Iterable<Row> next();
public abstract boolean done();
+ public abstract void markWriteFinished();
+ abstract void reset(double useChance, int targetCount, int batches, boolean isWrite);
public Partition partition()
{
@@ -128,31 +145,40 @@ public class Partition
}
}
- // permits iterating a random subset of the procedurally generated rows in this partition; this is the only mechanism for visiting rows
+ // permits iterating a random subset of the procedurally generated rows in this partition. this is the only mechanism for visiting rows.
// we maintain a stack of clustering components and their seeds; for each clustering component we visit, we generate all values it takes at that level,
// and then, using the average (total) number of children it takes we randomly choose whether or not we visit its children;
- // if we do, we generate all possible values the children can take, and repeat the process. So at any one time we are using space proportional
+ // if we do, we generate all possible values the immediate children can take, and repeat the process. So at any one time we are using space proportional
// to C.N, where N is the average number of values each clustering component takes, as opposed to N^C total values in the partition.
+ // TODO : guarantee at least one row is always returned
+ // TODO : support first/last row, and constraining reads to rows we know are populated
class MultiRowIterator extends RowIterator
{
// probability any single row will be generated in this iteration
double useChance;
- double expectedRowCount;
- // the current seed in use at any given level; used to save recalculating it for each row, so we only need to recalc
- // from prior row
+ // the seed used to generate the current values for the clustering components at each depth;
+ // used to save recalculating it for each row, so we only need to recalc from prior row.
final long[] clusteringSeeds = new long[generator.clusteringComponents.size()];
// the components remaining to be visited for each level of the current stack
- final Queue<Object>[] clusteringComponents = new ArrayDeque[generator.clusteringComponents.size()];
+ final Deque<Object>[] clusteringComponents = new ArrayDeque[generator.clusteringComponents.size()];
// we want our chance of selection to be applied uniformly, so we compound the roll we make at each level
// so that we know with what chance we reached there, and we adjust our roll at that level by that amount
- double[] chancemodifier = new double[generator.clusteringComponents.size()];
- double[] rollmodifier = new double[generator.clusteringComponents.size()];
+ final double[] chancemodifier = new double[generator.clusteringComponents.size()];
+ final double[] rollmodifier = new double[generator.clusteringComponents.size()];
+
+ // track where in the partition we are, and where we are limited to
+ final int[] position = new int[generator.clusteringComponents.size()];
+ final int[] limit = new int[position.length];
+ int batchSize;
+ boolean returnedOne;
+ boolean forceReturnOne;
- // reusable set for generating unique clustering components
+ // reusable collections for generating unique and sorted clustering components
final Set<Object> unique = new HashSet<>();
+ final List<Comparable> tosort = new ArrayList<>();
final Random random = new Random();
MultiRowIterator()
@@ -163,126 +189,262 @@ public class Partition
chancemodifier[0] = generator.clusteringChildAverages[0];
}
- void reset(double useChance, int targetCount)
+ // if we're a write, the expected behaviour is that the requested batch count is compounded with the seed's visit
+ // count to decide how much we should return in one iteration
+ void reset(double useChance, int targetCount, int batches, boolean isWrite)
{
+ if (this.useChance < 1d)
+ {
+ // we clear our prior roll-modifiers if the use chance was previously less-than zero
+ Arrays.fill(rollmodifier, 1d);
+ Arrays.fill(chancemodifier, 1d);
+ }
+
+ // set the seed for the first clustering component
generator.clusteringComponents.get(0).setSeed(idseed);
+ int[] position = seed.position;
+
+ // calculate how many first clustering components we'll generate, and how many total rows this predicts
int firstComponentCount = (int) generator.clusteringComponents.get(0).clusteringDistribution.next();
- this.expectedRowCount = firstComponentCount * generator.clusteringChildAverages[0];
+ int expectedRowCount;
+
+ if (!isWrite && position != null)
+ {
+ expectedRowCount = 0;
+ for (int i = 0 ; i < position.length ; i++)
+ {
+ expectedRowCount += position[i] * generator.clusteringChildAverages[i];
+ limit[i] = position[i];
+ }
+ }
+ else
+ {
+ expectedRowCount = firstComponentCount * generator.clusteringChildAverages[0];
+ if (isWrite)
+ batches *= seed.visits;
+ Arrays.fill(limit, Integer.MAX_VALUE);
+ }
+
+ batchSize = Math.max(1, expectedRowCount / batches);
if (Double.isNaN(useChance))
- useChance = Math.max(0d, Math.min(1d, targetCount / expectedRowCount));
+ useChance = Math.max(0d, Math.min(1d, targetCount / (double) expectedRowCount));
+ // clear any remnants of the last iteration, wire up our constants, and fill in the first clustering components
+ this.useChance = useChance;
+ this.returnedOne = false;
for (Queue<?> q : clusteringComponents)
q.clear();
-
- this.useChance = useChance;
clusteringSeeds[0] = idseed;
- clusteringComponents[0].add(this);
fill(clusteringComponents[0], firstComponentCount, generator.clusteringComponents.get(0));
- advance(0, 1f);
+
+ // seek to our start position
+ seek(isWrite ? position : null);
}
- void fill(int component)
+ // generate the clustering components for the provided depth; requires preceding components
+ // to have been generated and their seeds populated into clusteringSeeds
+ void fill(int depth)
{
- long seed = clusteringSeeds[component - 1];
- Generator gen = generator.clusteringComponents.get(component);
+ long seed = clusteringSeeds[depth - 1];
+ Generator gen = generator.clusteringComponents.get(depth);
gen.setSeed(seed);
- clusteringSeeds[component] = seed(clusteringComponents[component - 1].peek(), generator.clusteringComponents.get(component - 1).type, seed);
- fill(clusteringComponents[component], (int) gen.clusteringDistribution.next(), gen);
+ clusteringSeeds[depth] = seed(clusteringComponents[depth - 1].peek(), generator.clusteringComponents.get(depth - 1).type, seed);
+ fill(clusteringComponents[depth], (int) gen.clusteringDistribution.next(), gen);
}
+ // generate the clustering components into the queue
void fill(Queue<Object> queue, int count, Generator generator)
{
if (count == 1)
{
queue.add(generator.generate());
+ return;
}
- else
+
+ switch (Partition.this.generator.order)
{
- unique.clear();
- for (int i = 0 ; i < count ; i++)
- {
- Object next = generator.generate();
- if (unique.add(next))
- queue.add(next);
- }
+ case SORTED:
+ if (Comparable.class.isAssignableFrom(generator.clazz))
+ {
+ tosort.clear();
+ for (int i = 0 ; i < count ; i++)
+ tosort.add((Comparable) generator.generate());
+ Collections.sort(tosort);
+ for (int i = 0 ; i < count ; i++)
+ if (i == 0 || tosort.get(i - 1).compareTo(i) < 0)
+ queue.add(tosort.get(i));
+ break;
+ }
+ case ARBITRARY:
+ unique.clear();
+ for (int i = 0 ; i < count ; i++)
+ {
+ Object next = generator.generate();
+ if (unique.add(next))
+ queue.add(next);
+ }
+ break;
+ case SHUFFLED:
+ unique.clear();
+ tosort.clear();
+ for (int i = 0 ; i < count ; i++)
+ {
+ Object next = generator.generate();
+ if (unique.add(next))
+ tosort.add(new RandomOrder(next));
+ }
+ Collections.sort(tosort);
+ for (Object o : tosort)
+ queue.add(((RandomOrder)o).value);
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ // seek to the provided position (or the first entry if null)
+ private void seek(int[] position)
+ {
+ if (position == null)
+ {
+ this.position[0] = -1;
+ clusteringComponents[0].addFirst(this);
+ advance(0);
+ return;
+ }
+
+ assert position.length == clusteringComponents.length;
+ for (int i = 0 ; i < position.length ; i++)
+ {
+ if (i != 0)
+ fill(i);
+ for (int c = position[i] ; c > 0 ; c--)
+ clusteringComponents[i].poll();
+ row.row[i] = clusteringComponents[i].peek();
}
+ System.arraycopy(position, 0, this.position, 0, position.length);
}
- private boolean advance(double continueChance)
+ // normal method for moving the iterator forward; maintains the row object, and delegates to advance(int)
+ // to move the iterator to the next item
+ void advance()
{
- // we always start at the leaf level
+ // we are always at the leaf level when this method is invoked
+ // so we calculate the seed for generating the row by combining the seed that generated the clustering components
int depth = clusteringComponents.length - 1;
- // fill the row with the position we *were* at (unless pre-start)
+ long parentSeed = clusteringSeeds[depth];
+ long rowSeed = seed(clusteringComponents[depth].peek(), generator.clusteringComponents.get(depth).type, parentSeed);
+
+ // and then fill the row with the _non-clustering_ values for the position we _were_ at, as this is what we'll deliver
for (int i = clusteringSeeds.length ; i < row.row.length ; i++)
{
Generator gen = generator.valueComponents.get(i - clusteringSeeds.length);
- long seed = clusteringSeeds[depth];
- seed = seed(clusteringComponents[depth].peek(), generator.clusteringComponents.get(depth).type, seed);
- gen.setSeed(seed);
+ gen.setSeed(rowSeed);
row.row[i] = gen.generate();
}
- clusteringComponents[depth].poll();
+ returnedOne = true;
+ forceReturnOne = false;
- return advance(depth, continueChance);
+ // then we advance the leaf level
+ advance(depth);
}
- private boolean advance(int depth, double continueChance)
+ private void advance(int depth)
{
// advance the leaf component
clusteringComponents[depth].poll();
+ position[depth]++;
while (true)
{
if (clusteringComponents[depth].isEmpty())
{
+ // if we've run out of clustering components at this level, ascend
if (depth == 0)
- return false;
+ return;
depth--;
clusteringComponents[depth].poll();
+ position[depth]++;
continue;
}
- // the chance of descending is the uniform use chance, multiplied by the number of children
+ if (depth == 0 && !returnedOne && clusteringComponents[0].size() == 1)
+ forceReturnOne = true;
+
+ // the chance of descending is the uniform usechance, multiplied by the number of children
// we would on average generate (so if we have a 0.1 use chance, but should generate 10 children
// then we will always descend), multiplied by 1/(compound roll), where (compound roll) is the
// chance with which we reached this depth, i.e. if we already beat 50/50 odds, we double our
// chance of beating this next roll
double thischance = useChance * chancemodifier[depth];
- if (thischance > 0.999f || thischance >= random.nextDouble())
+ if (forceReturnOne || thischance > 0.999f || thischance >= random.nextDouble())
{
+ // if we're descending, we fill in our clustering component and increase our depth
row.row[depth] = clusteringComponents[depth].peek();
depth++;
if (depth == clusteringComponents.length)
break;
- rollmodifier[depth] = rollmodifier[depth - 1] / Math.min(1d, thischance);
- chancemodifier[depth] = generator.clusteringChildAverages[depth] * rollmodifier[depth];
+ // if we haven't reached the leaf, we update our probability statistics, fill in all of
+ // this level's clustering components, and repeat
+ if (useChance < 1d)
+ {
+ rollmodifier[depth] = rollmodifier[depth - 1] / Math.min(1d, thischance);
+ chancemodifier[depth] = generator.clusteringChildAverages[depth] * rollmodifier[depth];
+ }
+ position[depth] = 0;
fill(depth);
continue;
}
+ // if we don't descend, we remove the clustering suffix we've skipped and continue
clusteringComponents[depth].poll();
+ position[depth]++;
}
-
- return continueChance >= 1.0d || continueChance >= random.nextDouble();
}
- public Iterable<Row> batch(final double ratio)
+ public Iterable<Row> next()
{
- final double continueChance = 1d - (Math.pow(ratio, expectedRowCount * useChance));
+ final int[] limit = position.clone();
+ int remainingSize = batchSize;
+ for (int i = 0 ; i < limit.length && remainingSize > 0 ; i++)
+ {
+ limit[i] += remainingSize / generator.clusteringChildAverages[i];
+ remainingSize %= generator.clusteringChildAverages[i];
+ }
+ assert remainingSize == 0;
+ for (int i = limit.length - 1 ; i > 0 ; i--)
+ {
+ if (limit[i] > generator.clusteringChildAverages[i])
+ {
+ limit[i - 1] += limit[i] / generator.clusteringChildAverages[i];
+ limit[i] %= generator.clusteringChildAverages[i];
+ }
+ }
+ for (int i = 0 ; i < limit.length ; i++)
+ {
+ if (limit[i] < this.limit[i])
+ break;
+ limit[i] = Math.min(limit[i], this.limit[i]);
+ }
return new Iterable<Row>()
{
public Iterator<Row> iterator()
{
return new Iterator<Row>()
{
- boolean hasNext = true;
+
public boolean hasNext()
{
- return hasNext;
+ if (done())
+ return false;
+ for (int i = 0 ; i < position.length ; i++)
+ if (position[i] < limit[i])
+ return true;
+ return false;
}
public Row next()
{
- hasNext = advance(continueChance);
+ advance();
return row;
}
@@ -300,26 +462,37 @@ public class Partition
return clusteringComponents[0].isEmpty();
}
+ public void markWriteFinished()
+ {
+ if (done())
+ generator.seeds.markFinished(seed);
+ else
+ generator.seeds.markVisited(seed, position.clone());
+ }
+
public Partition partition()
{
return Partition.this;
}
}
- public String getKeyAsString()
+ private static class RandomOrder implements Comparable<RandomOrder>
{
- StringBuilder sb = new StringBuilder();
- int i = 0;
- for (Object key : partitionKey)
+ final int order = ThreadLocalRandom.current().nextInt();
+ final Object value;
+ private RandomOrder(Object value)
{
- if (i > 0)
- sb.append("|");
- AbstractType type = generator.partitionKey.get(i++).type;
- sb.append(type.getString(type.decompose(key)));
+ this.value = value;
+ }
+
+ public int compareTo(RandomOrder that)
+ {
+ return Integer.compare(this.order, that.order);
}
- return sb.toString();
}
+ // calculate a new seed based on the combination of a parent seed and the generated child, to generate
+ // any children of this child
static long seed(Object object, AbstractType type, long seed)
{
if (object instanceof ByteBuffer)
@@ -355,6 +528,20 @@ public class Partition
return partitionKey[i];
}
+ public String getKeyAsString()
+ {
+ StringBuilder sb = new StringBuilder();
+ int i = 0;
+ for (Object key : partitionKey)
+ {
+ if (i > 0)
+ sb.append("|");
+ AbstractType type = generator.partitionKey.get(i++).type;
+ sb.append(type.getString(type.decompose(key)));
+ }
+ return sb.toString();
+ }
+
// used for thrift smart routing - if it's a multi-part key we don't try to route correctly right now
public ByteBuffer getToken()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
index d05350d..128d2f5 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
@@ -30,18 +30,27 @@ import java.util.NoSuchElementException;
import com.google.common.collect.Iterables;
+import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.generate.values.Generator;
public class PartitionGenerator
{
+ public static enum Order
+ {
+ ARBITRARY, SHUFFLED, SORTED
+ }
+
public final double maxRowCount;
+ public final double minRowCount;
final List<Generator> partitionKey;
final List<Generator> clusteringComponents;
final List<Generator> valueComponents;
final int[] clusteringChildAverages;
private final Map<String, Integer> indexMap;
+ final Order order;
+ final SeedManager seeds;
final List<Partition> recyclable = new ArrayList<>();
int partitionsInUse = 0;
@@ -51,18 +60,25 @@ public class PartitionGenerator
partitionsInUse = 0;
}
- public PartitionGenerator(List<Generator> partitionKey, List<Generator> clusteringComponents, List<Generator> valueComponents)
+ public PartitionGenerator(List<Generator> partitionKey, List<Generator> clusteringComponents, List<Generator> valueComponents, Order order, SeedManager seeds)
{
this.partitionKey = partitionKey;
this.clusteringComponents = clusteringComponents;
this.valueComponents = valueComponents;
+ this.order = order;
+ this.seeds = seeds;
this.clusteringChildAverages = new int[clusteringComponents.size()];
for (int i = clusteringChildAverages.length - 1 ; i >= 0 ; i--)
clusteringChildAverages[i] = (int) (i < (clusteringChildAverages.length - 1) ? clusteringComponents.get(i + 1).clusteringDistribution.average() * clusteringChildAverages[i + 1] : 1);
double maxRowCount = 1d;
+ double minRowCount = 1d;
for (Generator component : clusteringComponents)
+ {
maxRowCount *= component.clusteringDistribution.maxValue();
+ minRowCount *= component.clusteringDistribution.minValue();
+ }
this.maxRowCount = maxRowCount;
+ this.minRowCount = minRowCount;
this.indexMap = new HashMap<>();
int i = 0;
for (Generator generator : partitionKey)
@@ -72,6 +88,11 @@ public class PartitionGenerator
indexMap.put(generator.name, i++);
}
+ public boolean permitNulls(int index)
+ {
+ return !(index < 0 || index < clusteringComponents.size());
+ }
+
public int indexOf(String name)
{
Integer i = indexMap.get(name);
@@ -80,11 +101,14 @@ public class PartitionGenerator
return i;
}
- public Partition generate(long seed)
+ public Partition generate(Operation op)
{
if (recyclable.size() <= partitionsInUse || recyclable.get(partitionsInUse) == null)
recyclable.add(new Partition(this));
+ Seed seed = seeds.next(op);
+ if (seed == null)
+ return null;
Partition partition = recyclable.get(partitionsInUse++);
partition.setSeed(seed);
return partition;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/RatioDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/RatioDistribution.java b/tools/stress/src/org/apache/cassandra/stress/generate/RatioDistribution.java
index 37ad4c5..c71945a 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/RatioDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/RatioDistribution.java
@@ -39,6 +39,11 @@ public class RatioDistribution
return Math.max(0f, Math.min(1f, distribution.nextDouble() / divisor));
}
+ public double min()
+ {
+ return Math.min(1d, distribution.minValue() / divisor);
+ }
+
public double max()
{
return Math.min(1d, distribution.maxValue() / divisor);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
new file mode 100644
index 0000000..f427608
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
@@ -0,0 +1,67 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.stress.generate;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import org.apache.cassandra.stress.util.DynamicList;
+
+public class Seed implements Comparable<Seed>
+{
+
+ public final long seed;
+ final int visits;
+
+ DynamicList.Node poolNode;
+ volatile int[] position;
+ volatile State state = State.HELD;
+
+ private static final AtomicReferenceFieldUpdater<Seed, Seed.State> stateUpdater = AtomicReferenceFieldUpdater.newUpdater(Seed.class, State.class, "state");
+
+ public int compareTo(Seed that)
+ {
+ return Long.compare(this.seed, that.seed);
+ }
+
+ static enum State
+ {
+ HELD, AVAILABLE
+ }
+
+ Seed(long seed, int visits)
+ {
+ this.seed = seed;
+ this.visits = visits;
+ }
+
+ boolean take()
+ {
+ return stateUpdater.compareAndSet(this, State.AVAILABLE, State.HELD);
+ }
+
+ void yield()
+ {
+ state = State.AVAILABLE;
+ }
+
+ public int[] position()
+ {
+ return position;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/SeedGenerator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/SeedGenerator.java b/tools/stress/src/org/apache/cassandra/stress/generate/SeedGenerator.java
deleted file mode 100644
index d579223..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/generate/SeedGenerator.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.cassandra.stress.generate;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-public interface SeedGenerator
-{
-
- long next(long workIndex);
-
-}