You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/04 19:04:07 UTC
[2/3] cassandra git commit: cassandra-stress supports validation
operations over user profiles
cassandra-stress supports validation operations over user profiles
patch by benedict; reviewed by snazy for CASSANDRA-8773
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3bee990c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3bee990c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3bee990c
Branch: refs/heads/trunk
Commit: 3bee990ca2e46bf0fd5742c56b5d00cc0566950b
Parents: e6f0279
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon May 4 18:01:38 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon May 4 18:01:38 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/stress/Operation.java | 13 +-
.../apache/cassandra/stress/StressAction.java | 4 +-
.../apache/cassandra/stress/StressProfile.java | 25 +-
.../stress/generate/PartitionGenerator.java | 9 +
.../stress/generate/PartitionIterator.java | 166 +++++++--
.../apache/cassandra/stress/generate/Row.java | 15 +-
.../SampledOpDistributionFactory.java | 26 +-
.../operations/userdefined/SchemaInsert.java | 14 +-
.../operations/userdefined/SchemaQuery.java | 7 +-
.../operations/userdefined/SchemaStatement.java | 35 +-
.../userdefined/ValidatingSchemaQuery.java | 359 +++++++++++++++++++
.../SettingsCommandPreDefinedMixed.java | 9 +-
.../stress/settings/SettingsCommandUser.java | 9 +-
.../stress/settings/ValidationType.java | 29 --
15 files changed, 581 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e7689ab..3a2daa7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.6
+ * cassandra-stress supports validation operations over user profiles (CASSANDRA-8773)
* Add support for rate limiting log messages (CASSANDRA-9029)
* Log the partition key with tombstone warnings (CASSANDRA-8561)
* Reduce runWithCompactionsDisabled poll interval to 1ms (CASSANDRA-9271)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index f4ac5ee..1179f71 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -104,10 +104,7 @@ public abstract class Operation
if (seed == null)
break;
- if (spec.useRatio == null)
- success = partitionCache.get(i).reset(seed, spec.targetCount, isWrite());
- else
- success = partitionCache.get(i).reset(seed, spec.useRatio.next(), isWrite());
+ success = reset(seed, partitionCache.get(i));
}
}
partitionCount = i;
@@ -119,6 +116,14 @@ public abstract class Operation
return !partitions.isEmpty();
}
+ protected boolean reset(Seed seed, PartitionIterator iterator)
+ {
+ if (spec.useRatio == null)
+ return iterator.reset(seed, spec.targetCount, isWrite());
+ else
+ return iterator.reset(seed, spec.useRatio.next(), isWrite());
+ }
+
public boolean isWrite()
{
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index f906a55..158a278 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -88,7 +88,9 @@ public class StressAction implements Runnable
// warmup - do 50k iterations; by default hotspot compiles methods after 10k invocations
PrintStream warmupOutput = new PrintStream(new OutputStream() { @Override public void write(int b) throws IOException { } } );
int iterations = 50000 * settings.node.nodes.size();
- int threads = 20;
+ int threads = 100;
+ if (iterations > settings.command.count && settings.command.count > 0)
+ return;
if (settings.rate.maxThreads > 0)
threads = Math.min(threads, settings.rate.maxThreads);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 6c73214..49c4682 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.stress.generate.*;
import org.apache.cassandra.stress.generate.values.*;
import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
import org.apache.cassandra.stress.operations.userdefined.SchemaQuery;
+import org.apache.cassandra.stress.operations.userdefined.ValidatingSchemaQuery;
import org.apache.cassandra.stress.settings.*;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
@@ -75,6 +76,7 @@ public class StressProfile implements Serializable
transient volatile RatioDistributionFactory selectchance;
transient volatile PreparedStatement insertStatement;
transient volatile Integer thriftInsertId;
+ transient volatile List<ValidatingSchemaQuery.Factory> validationFactories;
transient volatile Map<String, SchemaQuery.ArgSelect> argSelects;
transient volatile Map<String, PreparedStatement> queryStatements;
@@ -265,12 +267,11 @@ public class StressProfile implements Serializable
}
}
- // TODO validation
name = name.toLowerCase();
if (!queryStatements.containsKey(name))
throw new IllegalArgumentException("No query defined with name " + name);
return new SchemaQuery(timer, settings, generator, seeds, thriftQueryIds.get(name), queryStatements.get(name),
- ThriftConversion.fromThrift(settings.command.consistencyLevel), ValidationType.NOT_FAIL, argSelects.get(name));
+ ThriftConversion.fromThrift(settings.command.consistencyLevel), argSelects.get(name));
}
public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
@@ -388,6 +389,26 @@ public class StressProfile implements Serializable
return new SchemaInsert(timer, settings, generator, seedManager, partitions.get(), selectchance.get(), thriftInsertId, insertStatement, ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType);
}
+ public List<ValidatingSchemaQuery> getValidate(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
+ {
+ if (validationFactories == null)
+ {
+ synchronized (this)
+ {
+ if (validationFactories == null)
+ {
+ maybeLoadSchemaInfo(settings);
+ validationFactories = ValidatingSchemaQuery.create(tableMetaData, settings);
+ }
+ }
+ }
+
+ List<ValidatingSchemaQuery> queries = new ArrayList<>();
+ for (ValidatingSchemaQuery.Factory factory : validationFactories)
+ queries.add(factory.create(timer, settings, generator, seedManager, ThriftConversion.fromThrift(settings.command.consistencyLevel)));
+ return queries;
+ }
+
private static <E> E select(E first, String key, String defValue, Map<String, String> map, Function<String, E> builder)
{
String val = map.remove(key);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
index 9f88068..a7297c5 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
@@ -101,4 +101,13 @@ public class PartitionGenerator
return clusteringComponents.get(c).type.decompose(v);
return valueComponents.get(c - clusteringComponents.size()).type.decompose(v);
}
+
+ public Object convert(int c, ByteBuffer v)
+ {
+ if (c < 0)
+ return partitionKey.get(-1-c).type.compose(v);
+ if (c < clusteringComponents.size())
+ return clusteringComponents.get(c).type.compose(v);
+ return valueComponents.get(c - clusteringComponents.size()).type.compose(v);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
index 0466edb..4906b95 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
@@ -36,10 +36,12 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import com.google.common.collect.Iterables;
+
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.generate.values.Generator;
+import org.apache.cassandra.utils.Pair;
// a partition is re-used to reduce garbage generation, as is its internal RowIterator
// TODO: we should batch the generation of clustering components so we can bound the time and size necessary to
@@ -50,8 +52,11 @@ import org.apache.cassandra.stress.generate.values.Generator;
public abstract class PartitionIterator implements Iterator<Row>
{
- abstract boolean reset(double useChance, int targetCount, boolean isWrite);
+ abstract boolean reset(double useChance, int targetCount, boolean isWrite, PartitionGenerator.Order order);
+ // picks random (inclusive) bounds to iterate, and returns them
+ public abstract Pair<Row, Row> resetToBounds(Seed seed, int clusteringComponentDepth);
+ PartitionGenerator.Order order;
long idseed;
Seed seed;
@@ -78,7 +83,7 @@ public abstract class PartitionIterator implements Iterator<Row>
this.row = new Row(partitionKey, new Object[generator.clusteringComponents.size() + generator.valueComponents.size()]);
}
- private void setSeed(Seed seed)
+ void setSeed(Seed seed)
{
long idseed = 0;
for (int i = 0 ; i < partitionKey.length ; i++)
@@ -98,13 +103,15 @@ public abstract class PartitionIterator implements Iterator<Row>
public boolean reset(Seed seed, double useChance, boolean isWrite)
{
setSeed(seed);
- return reset(useChance, 0, isWrite);
+ this.order = generator.order;
+ return reset(useChance, 0, isWrite, PartitionIterator.this.order);
}
public boolean reset(Seed seed, int targetCount, boolean isWrite)
{
setSeed(seed);
- return reset(Double.NaN, targetCount, isWrite);
+ this.order = generator.order;
+ return reset(Double.NaN, targetCount, isWrite, PartitionIterator.this.order);
}
static class SingleRowIterator extends PartitionIterator
@@ -117,7 +124,15 @@ public abstract class PartitionIterator implements Iterator<Row>
super(generator, seedManager);
}
- boolean reset(double useChance, int targetCount, boolean isWrite)
+ public Pair<Row, Row> resetToBounds(Seed seed, int clusteringComponentDepth)
+ {
+ assert clusteringComponentDepth == 0;
+ setSeed(seed);
+ reset(1d, 1, false, PartitionGenerator.Order.SORTED);
+ return Pair.create(new Row(partitionKey), new Row(partitionKey));
+ }
+
+ boolean reset(double useChance, int targetCount, boolean isWrite, PartitionGenerator.Order order)
{
done = false;
this.isWrite = isWrite;
@@ -201,16 +216,11 @@ public abstract class PartitionIterator implements Iterator<Row>
*
* @return true if there is data to return, false otherwise
*/
- boolean reset(double useChance, int targetCount, boolean isWrite)
+ boolean reset(double useChance, int targetCount, boolean isWrite, PartitionGenerator.Order order)
{
this.isWrite = isWrite;
- if (this.useChance < 1d)
- {
- // we clear our prior roll-modifiers if the use chance was previously less-than zero
- Arrays.fill(rollmodifier, 1d);
- Arrays.fill(chancemodifier, 1d);
- }
+ this.order = order;
// set the seed for the first clustering component
generator.clusteringComponents.get(0).setSeed(idseed);
@@ -229,7 +239,7 @@ public abstract class PartitionIterator implements Iterator<Row>
if (Double.isNaN(useChance))
useChance = Math.max(0d, Math.min(1d, targetCount / (double) expectedRowCount));
- this.useChance = useChance;
+ setUseChance(useChance);
while (true)
{
@@ -239,8 +249,7 @@ public abstract class PartitionIterator implements Iterator<Row>
for (Queue<?> q : clusteringComponents)
q.clear();
- clusteringSeeds[0] = idseed;
- fill(clusteringComponents[0], firstComponentCount, generator.clusteringComponents.get(0));
+ fill(0);
if (!isWrite)
{
@@ -249,8 +258,7 @@ public abstract class PartitionIterator implements Iterator<Row>
return true;
}
-
- int count = Math.max(1, expectedRowCount / seed.visits);
+ int count = seed.visits == 1 ? 1 + (int) generator.maxRowCount : Math.max(1, expectedRowCount / seed.visits);
position = seed.moveForwards(count);
isFirstWrite = position == 0;
setLastRow(position + count - 1);
@@ -266,6 +274,44 @@ public abstract class PartitionIterator implements Iterator<Row>
}
}
+ void setUseChance(double useChance)
+ {
+ if (this.useChance < 1d)
+ {
+ // we clear our prior roll-modifiers if the use chance was previously less-than zero
+ Arrays.fill(rollmodifier, 1d);
+ Arrays.fill(chancemodifier, 1d);
+ }
+ this.useChance = useChance;
+ }
+
+ public Pair<Row, Row> resetToBounds(Seed seed, int clusteringComponentDepth)
+ {
+ setSeed(seed);
+ setUseChance(1d);
+ if (clusteringComponentDepth == 0)
+ {
+ reset(1d, -1, false, PartitionGenerator.Order.SORTED);
+ return Pair.create(new Row(partitionKey), new Row(partitionKey));
+ }
+
+ this.order = PartitionGenerator.Order.SORTED;
+ assert clusteringComponentDepth <= clusteringComponents.length;
+ for (Queue<?> q : clusteringComponents)
+ q.clear();
+
+ fill(0);
+ Pair<int[], Object[]> bound1 = randomBound(clusteringComponentDepth);
+ Pair<int[], Object[]> bound2 = randomBound(clusteringComponentDepth);
+ if (compare(bound1.left, bound2.left) > 0) { Pair<int[], Object[]> tmp = bound1; bound1 = bound2; bound2 = tmp;}
+ Arrays.fill(lastRow, 0);
+ System.arraycopy(bound2.left, 0, lastRow, 0, bound2.left.length);
+ Arrays.fill(currentRow, 0);
+ System.arraycopy(bound1.left, 0, currentRow, 0, bound1.left.length);
+ seekToCurrentRow();
+ return Pair.create(new Row(partitionKey, bound1.right), new Row(partitionKey, bound2.right));
+ }
+
// returns expected row count
private int setNoLastRow(int firstComponentCount)
{
@@ -296,12 +342,39 @@ public abstract class PartitionIterator implements Iterator<Row>
// OR if that row does not exist, it is the last row prior to it
private int compareToLastRow(int depth)
{
+ int prev = 0;
for (int i = 0 ; i <= depth ; i++)
{
int p = currentRow[i], l = lastRow[i], r = clusteringComponents[i].size();
- if ((p == l) | (r == 1))
- continue;
- return p - l;
+ if (prev < 0)
+ {
+ // if we're behind our last position in theory, and have known more items to visit in practice
+ // we're definitely behind our last row
+ if (r > 1)
+ return -1;
+ // otherwise move forwards to see if we might have more to visit
+ }
+ else if (p > l)
+ {
+ // prev must be == 0, so if p > l, we're after our last row
+ return 1;
+ }
+ else if (p == l)
+ {
+ // if we're equal to our last row up to our current depth, then we need to loop and look forwards
+ }
+ else if (r == 1)
+ {
+ // if this is our last item in practice, store if we're behind our theoretical position
+ // and move forwards; if every remaining practical item is 1, we're at the last row
+ // otherwise we're before it
+ prev = p - l;
+ }
+ else
+ {
+ // p < l, and r > 1, so we're definitely not at the end
+ return -1;
+ }
}
return 0;
}
@@ -330,6 +403,14 @@ public abstract class PartitionIterator implements Iterator<Row>
}
}
+ private static int compare(int[] l, int[] r)
+ {
+ for (int i = 0 ; i < l.length ; i++)
+ if (l[i] != r[i])
+ return Integer.compare(l[i], r[i]);
+ return 0;
+ }
+
static enum State
{
END_OF_PARTITION, AFTER_LIMIT, SUCCESS;
@@ -349,9 +430,12 @@ public abstract class PartitionIterator implements Iterator<Row>
clusteringComponents[0].addFirst(this);
return setHasNext(advance(0, true));
}
-
+ decompose(scalar, this.currentRow);
+ return seekToCurrentRow();
+ }
+ private State seekToCurrentRow()
+ {
int[] position = this.currentRow;
- decompose(scalar, position);
for (int i = 0 ; i < position.length ; i++)
{
if (i != 0)
@@ -399,7 +483,7 @@ public abstract class PartitionIterator implements Iterator<Row>
// normal method for moving the iterator forward; maintains the row object, and delegates to advance(int)
// to move the iterator to the next item
- void advance()
+ Row advance()
{
// we are always at the leaf level when this method is invoked
// so we calculate the seed for generating the row by combining the seed that generated the clustering components
@@ -407,16 +491,18 @@ public abstract class PartitionIterator implements Iterator<Row>
long parentSeed = clusteringSeeds[depth];
long rowSeed = seed(clusteringComponents[depth].peek(), generator.clusteringComponents.get(depth).type, parentSeed);
+ Row result = row.copy();
// and then fill the row with the _non-clustering_ values for the position we _were_ at, as this is what we'll deliver
for (int i = clusteringSeeds.length ; i < row.row.length ; i++)
{
Generator gen = generator.valueComponents.get(i - clusteringSeeds.length);
gen.setSeed(rowSeed);
- row.row[i] = gen.generate();
+ result.row[i] = gen.generate();
}
// then we advance the leaf level
setHasNext(advance(depth, false));
+ return result;
}
private boolean advance(int depth, boolean first)
@@ -481,15 +567,33 @@ public abstract class PartitionIterator implements Iterator<Row>
}
}
+ private Pair<int[], Object[]> randomBound(int clusteringComponentDepth)
+ {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ int[] position = new int[clusteringComponentDepth];
+ Object[] bound = new Object[clusteringComponentDepth];
+ position[0] = rnd.nextInt(clusteringComponents[0].size());
+ bound[0] = Iterables.get(clusteringComponents[0], position[0]);
+ for (int d = 1 ; d < clusteringComponentDepth ; d++)
+ {
+ fill(d);
+ position[d] = rnd.nextInt(clusteringComponents[d].size());
+ bound[d] = Iterables.get(clusteringComponents[d], position[d]);
+ }
+ for (int d = 1 ; d < clusteringComponentDepth ; d++)
+ clusteringComponents[d].clear();
+ return Pair.create(position, bound);
+ }
+
// generate the clustering components for the provided depth; requires preceding components
// to have been generated and their seeds populated into clusteringSeeds
void fill(int depth)
{
- long seed = clusteringSeeds[depth - 1];
+ long seed = depth == 0 ? idseed : clusteringSeeds[depth - 1];
Generator gen = generator.clusteringComponents.get(depth);
gen.setSeed(seed);
- clusteringSeeds[depth] = seed(clusteringComponents[depth - 1].peek(), generator.clusteringComponents.get(depth - 1).type, seed);
fill(clusteringComponents[depth], (int) gen.clusteringDistribution.next(), gen);
+ clusteringSeeds[depth] = seed(clusteringComponents[depth].peek(), generator.clusteringComponents.get(depth).type, seed);
}
// generate the clustering components into the queue
@@ -501,7 +605,7 @@ public abstract class PartitionIterator implements Iterator<Row>
return;
}
- switch (this.generator.order)
+ switch (order)
{
case SORTED:
if (Comparable.class.isAssignableFrom(generator.clazz))
@@ -511,7 +615,7 @@ public abstract class PartitionIterator implements Iterator<Row>
tosort.add(generator.generate());
Collections.sort((List<Comparable>) (List<?>) tosort);
for (int i = 0 ; i < count ; i++)
- if (i == 0 || ((Comparable) tosort.get(i - 1)).compareTo(i) < 0)
+ if (i == 0 || ((Comparable) tosort.get(i - 1)).compareTo(tosort.get(i)) < 0)
queue.add(tosort.get(i));
break;
}
@@ -556,8 +660,7 @@ public abstract class PartitionIterator implements Iterator<Row>
{
if (!hasNext())
throw new NoSuchElementException();
- advance();
- return row;
+ return advance();
}
public boolean finishedPartition()
@@ -646,5 +749,4 @@ public abstract class PartitionIterator implements Iterator<Row>
{
return generator.partitionKey.get(0).type.decompose(partitionKey[0]);
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/generate/Row.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Row.java b/tools/stress/src/org/apache/cassandra/stress/generate/Row.java
index 421dbbf..bf547c8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/Row.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/Row.java
@@ -23,9 +23,16 @@ package org.apache.cassandra.stress.generate;
public class Row
{
+ private static final Object[] EMPTY_ROW_DATA = new Object[0];
- final Object[] partitionKey;
- final Object[] row;
+ public final Object[] partitionKey;
+ public final Object[] row;
+
+ public Row(Object[] partitionKey)
+ {
+ this.partitionKey = partitionKey;
+ this.row = EMPTY_ROW_DATA;
+ }
public Row(Object[] partitionKey, Object[] row)
{
@@ -40,4 +47,8 @@ public class Row
return row[column];
}
+ public Row copy()
+ {
+ return new Row(partitionKey.clone(), row.clone());
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
index 10191a6..a20272a 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
@@ -21,19 +21,14 @@ package org.apache.cassandra.stress.operations;
*/
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import org.apache.cassandra.stress.generate.*;
import org.apache.cassandra.stress.util.Timing;
import org.apache.commons.math3.distribution.EnumeratedDistribution;
import org.apache.commons.math3.util.Pair;
import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.generate.DistributionFactory;
-import org.apache.cassandra.stress.generate.PartitionGenerator;
-import org.apache.cassandra.stress.generate.SeedManager;
import org.apache.cassandra.stress.util.Timer;
public abstract class SampledOpDistributionFactory<T> implements OpDistributionFactory
@@ -47,7 +42,7 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
this.clustering = clustering;
}
- protected abstract Operation get(Timer timer, PartitionGenerator generator, T key);
+ protected abstract List<? extends Operation> get(Timer timer, PartitionGenerator generator, T key);
protected abstract PartitionGenerator newGenerator();
public OpDistribution get(Timing timing, int sampleCount)
@@ -55,8 +50,11 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
PartitionGenerator generator = newGenerator();
List<Pair<Operation, Double>> operations = new ArrayList<>();
for (Map.Entry<T, Double> ratio : ratios.entrySet())
- operations.add(new Pair<>(get(timing.newTimer(ratio.getKey().toString(), sampleCount), generator, ratio.getKey()),
- ratio.getValue()));
+ {
+ List<? extends Operation> ops = get(timing.newTimer(ratio.getKey().toString(), sampleCount), generator, ratio.getKey());
+ for (Operation op : ops)
+ operations.add(new Pair<>(op, ratio.getValue() / ops.size()));
+ }
return new SampledOpDistribution(new EnumeratedDistribution<>(operations), clustering.get());
}
@@ -77,7 +75,13 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
{
public OpDistribution get(Timing timing, int sampleCount)
{
- return new FixedOpDistribution(SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(), sampleCount), newGenerator(), ratio.getKey()));
+ List<? extends Operation> ops = SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(), sampleCount), newGenerator(), ratio.getKey());
+ if (ops.size() == 1)
+ return new FixedOpDistribution(ops.get(0));
+ List<Pair<Operation, Double>> ratios = new ArrayList<>();
+ for (Operation op : ops)
+ ratios.add(new Pair<>(op, 1d / ops.size()));
+ return new SampledOpDistribution(new EnumeratedDistribution<Operation>(ratios), new DistributionFixed(1));
}
public String desc()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
index a915d93..ef4d53f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
@@ -32,7 +32,6 @@ import com.datastax.driver.core.Statement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.stress.generate.*;
import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.settings.ValidationType;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.stress.util.Timer;
@@ -44,7 +43,7 @@ public class SchemaInsert extends SchemaStatement
public SchemaInsert(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, Distribution batchSize, RatioDistribution useRatio, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, BatchStatement.Type batchType)
{
- super(timer, settings, new DataSpec(generator, seedManager, batchSize, useRatio), statement, thriftId, cl, ValidationType.NOT_FAIL);
+ super(timer, settings, new DataSpec(generator, seedManager, batchSize, useRatio), statement, thriftId, cl);
this.batchType = batchType;
}
@@ -85,14 +84,7 @@ public class SchemaInsert extends SchemaStatement
stmt = batch;
}
- try
- {
- validate(client.getSession().execute(stmt));
- }
- catch (ClassCastException e)
- {
- e.printStackTrace();
- }
+ client.getSession().execute(stmt);
}
return true;
}
@@ -113,7 +105,7 @@ public class SchemaInsert extends SchemaStatement
{
while (iterator.hasNext())
{
- validate(client.execute_prepared_cql3_query(thriftId, iterator.getToken(), thriftRowArgs(iterator.next()), settings.command.consistencyLevel));
+ client.execute_prepared_cql3_query(thriftId, iterator.getToken(), thriftRowArgs(iterator.next()), settings.command.consistencyLevel);
rowCount += 1;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
index a51bac4..58f5307 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
@@ -33,7 +33,6 @@ import com.datastax.driver.core.ResultSet;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.stress.generate.*;
import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.settings.ValidationType;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.stress.util.Timer;
@@ -52,9 +51,9 @@ public class SchemaQuery extends SchemaStatement
final Object[][] randomBuffer;
final Random random = new Random();
- public SchemaQuery(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, ValidationType validationType, ArgSelect argSelect)
+ public SchemaQuery(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, ArgSelect argSelect)
{
- super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), argSelect == ArgSelect.MULTIROW ? statement.getVariables().size() : 1), statement, thriftId, cl, validationType);
+ super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), argSelect == ArgSelect.MULTIROW ? statement.getVariables().size() : 1), statement, thriftId, cl);
this.argSelect = argSelect;
randomBuffer = new Object[argumentIndex.length][argumentIndex.length];
}
@@ -71,7 +70,6 @@ public class SchemaQuery extends SchemaStatement
public boolean run() throws Exception
{
ResultSet rs = client.getSession().execute(bindArgs());
- validate(rs);
rowCount = rs.all().size();
partitionCount = Math.min(1, rowCount);
return true;
@@ -90,7 +88,6 @@ public class SchemaQuery extends SchemaStatement
public boolean run() throws Exception
{
CqlResult rs = client.execute_prepared_cql3_query(thriftId, partitions.get(0).getToken(), thriftArgs(), ThriftConversion.toThrift(cl));
- validate(rs);
rowCount = rs.getRowsSize();
partitionCount = Math.min(1, rowCount);
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
index 4305151..e90de23 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
@@ -34,7 +34,6 @@ import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.generate.Row;
import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.settings.ValidationType;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.thrift.CqlResult;
@@ -46,18 +45,16 @@ public abstract class SchemaStatement extends Operation
final PreparedStatement statement;
final Integer thriftId;
final ConsistencyLevel cl;
- final ValidationType validationType;
final int[] argumentIndex;
final Object[] bindBuffer;
public SchemaStatement(Timer timer, StressSettings settings, DataSpec spec,
- PreparedStatement statement, Integer thriftId, ConsistencyLevel cl, ValidationType validationType)
+ PreparedStatement statement, Integer thriftId, ConsistencyLevel cl)
{
super(timer, settings, spec);
this.statement = statement;
this.thriftId = thriftId;
this.cl = cl;
- this.validationType = validationType;
argumentIndex = new int[statement.getVariables().size()];
bindBuffer = new Object[argumentIndex.length];
int i = 0;
@@ -86,36 +83,6 @@ public abstract class SchemaStatement extends Operation
return args;
}
- void validate(ResultSet rs)
- {
- switch (validationType)
- {
- case NOT_FAIL:
- return;
- case NON_ZERO:
- if (rs.all().size() == 0)
- throw new IllegalStateException("Expected non-zero results");
- break;
- default:
- throw new IllegalStateException("Unsupported validation type");
- }
- }
-
- void validate(CqlResult rs)
- {
- switch (validationType)
- {
- case NOT_FAIL:
- return;
- case NON_ZERO:
- if (rs.getRowsSize() == 0)
- throw new IllegalStateException("Expected non-zero results");
- break;
- default:
- throw new IllegalStateException("Unsupported validation type");
- }
- }
-
@Override
public void run(SimpleClient client) throws IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
new file mode 100644
index 0000000..2cbdcb4
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
@@ -0,0 +1,359 @@
+package org.apache.cassandra.stress.operations.userdefined;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import com.datastax.driver.core.*;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.generate.*;
+import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.CqlRow;
+import org.apache.cassandra.thrift.ThriftConversion;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.utils.Pair;
+import org.apache.thrift.TException;
+
+public class ValidatingSchemaQuery extends Operation
+{
+ final Random random = new Random();
+ private Pair<Row, Row> bounds;
+
+ final int clusteringComponents;
+ final ValidatingStatement[] statements;
+ final ConsistencyLevel cl;
+ final int[] argumentIndex;
+ final Object[] bindBuffer;
+
+ @Override
+ public void run(SimpleClient client) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ private ValidatingSchemaQuery(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, ValidatingStatement[] statements, ConsistencyLevel cl, int clusteringComponents)
+ {
+ super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), 1));
+ this.statements = statements;
+ this.cl = cl;
+ argumentIndex = new int[statements[0].statement.getVariables().size()];
+ bindBuffer = new Object[argumentIndex.length];
+ int i = 0;
+ for (ColumnDefinitions.Definition definition : statements[0].statement.getVariables())
+ argumentIndex[i++] = spec.partitionGenerator.indexOf(definition.getName());
+
+ for (ValidatingStatement statement : statements)
+ statement.statement.setConsistencyLevel(JavaDriverClient.from(cl));
+ this.clusteringComponents = clusteringComponents;
+ }
+
+ protected boolean reset(Seed seed, PartitionIterator iterator)
+ {
+ bounds = iterator.resetToBounds(seed, clusteringComponents);
+ return true;
+ }
+
+ abstract class Runner implements RunOp
+ {
+ int partitionCount;
+ int rowCount;
+ final PartitionIterator iter;
+ final int statementIndex;
+
+ protected Runner(PartitionIterator iter)
+ {
+ this.iter = iter;
+ statementIndex = ThreadLocalRandom.current().nextInt(statements.length);
+ }
+
+ @Override
+ public int partitionCount()
+ {
+ return partitionCount;
+ }
+
+ @Override
+ public int rowCount()
+ {
+ return rowCount;
+ }
+ }
+
+ private class JavaDriverRun extends Runner
+ {
+ final JavaDriverClient client;
+
+ private JavaDriverRun(JavaDriverClient client, PartitionIterator iter)
+ {
+ super(iter);
+ this.client = client;
+ }
+
+ public boolean run() throws Exception
+ {
+ ResultSet rs = client.getSession().execute(bind(statementIndex));
+ int[] valueIndex = new int[rs.getColumnDefinitions().size()];
+ {
+ int i = 0;
+ for (ColumnDefinitions.Definition definition : rs.getColumnDefinitions())
+ valueIndex[i++] = spec.partitionGenerator.indexOf(definition.getName());
+ }
+
+ List<Object[]> prev1 = new ArrayList<>();
+ List<Object[]> prev2 = new ArrayList<>();
+ rowCount = 0;
+ Iterator<com.datastax.driver.core.Row> results = rs.iterator();
+ if (!statements[statementIndex].inclusiveStart && iter.hasNext())
+ iter.next();
+ while (iter.hasNext())
+ {
+ Row expectedRow = iter.next();
+ if (!statements[statementIndex].inclusiveEnd && !iter.hasNext())
+ break;
+
+ if (!results.hasNext())
+ return false;
+
+ rowCount++;
+ com.datastax.driver.core.Row actualRow = results.next();
+ Object[] vs1 = new Object[actualRow.getColumnDefinitions().size()];
+ Object[] vs2 = vs1.clone();
+ for (int i = 0 ; i < actualRow.getColumnDefinitions().size() ; i++)
+ {
+ Object expectedValue = expectedRow.get(valueIndex[i]);
+ Object actualValue = spec.partitionGenerator.convert(valueIndex[i], actualRow.getBytesUnsafe(i));
+ vs1[i] = expectedValue;
+ vs2[i] = actualValue;
+ if (!expectedValue.equals(actualValue))
+ return false;
+ }
+ prev1.add(vs1);
+ prev2.add(vs2);
+ }
+ partitionCount = Math.min(1, rowCount);
+ if (!rs.isExhausted())
+ return false;
+ return true;
+ }
+ }
+
+ private class ThriftRun extends Runner
+ {
+ final ThriftClient client;
+
+ private ThriftRun(ThriftClient client, PartitionIterator iter)
+ {
+ super(iter);
+ this.client = client;
+ }
+
+ public boolean run() throws Exception
+ {
+ CqlResult rs = client.execute_prepared_cql3_query(statements[statementIndex].thriftId, partitions.get(0).getToken(), thriftArgs(), ThriftConversion.toThrift(cl));
+ int[] valueIndex = new int[rs.getSchema().name_types.size()];
+ for (int i = 0 ; i < valueIndex.length ; i++)
+ valueIndex[i] = spec.partitionGenerator.indexOf(rs.fieldForId(i).getFieldName());
+ int r = 0;
+ if (!statements[statementIndex].inclusiveStart && iter.hasNext())
+ iter.next();
+ while (iter.hasNext())
+ {
+ Row expectedRow = iter.next();
+ if (!statements[statementIndex].inclusiveEnd && !iter.hasNext())
+ break;
+
+ if (r == rs.num)
+ return false;
+
+ rowCount++;
+ CqlRow actualRow = rs.getRows().get(r++);
+ for (int i = 0 ; i < actualRow.getColumnsSize() ; i++)
+ {
+ ByteBuffer expectedValue = spec.partitionGenerator.convert(valueIndex[i], expectedRow.get(valueIndex[i]));
+ ByteBuffer actualValue = actualRow.getColumns().get(i).value;
+ if (!expectedValue.equals(actualValue))
+ return false;
+ }
+ }
+ assert r == rs.num;
+ partitionCount = Math.min(1, rowCount);
+ return true;
+ }
+ }
+
+ BoundStatement bind(int statementIndex)
+ {
+ int pkc = bounds.left.partitionKey.length;
+ System.arraycopy(bounds.left.partitionKey, 0, bindBuffer, 0, pkc);
+ int ccc = bounds.left.row.length;
+ System.arraycopy(bounds.left.row, 0, bindBuffer, pkc, ccc);
+ System.arraycopy(bounds.right.row, 0, bindBuffer, pkc + ccc, ccc);
+ return statements[statementIndex].statement.bind(bindBuffer);
+ }
+
+ List<ByteBuffer> thriftArgs()
+ {
+ List<ByteBuffer> args = new ArrayList<>();
+ int pkc = bounds.left.partitionKey.length;
+ for (int i = 0 ; i < pkc ; i++)
+ args.add(spec.partitionGenerator.convert(-i, bounds.left.partitionKey[i]));
+ int ccc = bounds.left.row.length;
+ for (int i = 0 ; i < ccc ; i++)
+ args.add(spec.partitionGenerator.convert(i, bounds.left.get(i)));
+ for (int i = 0 ; i < ccc ; i++)
+ args.add(spec.partitionGenerator.convert(i, bounds.right.get(i)));
+ return args;
+ }
+
+ @Override
+ public void run(JavaDriverClient client) throws IOException
+ {
+ timeWithRetry(new JavaDriverRun(client, partitions.get(0)));
+ }
+
+ @Override
+ public void run(ThriftClient client) throws IOException
+ {
+ timeWithRetry(new ThriftRun(client, partitions.get(0)));
+ }
+
+ public static class Factory
+ {
+ final ValidatingStatement[] statements;
+ final int clusteringComponents;
+
+ public Factory(ValidatingStatement[] statements, int clusteringComponents)
+ {
+ this.statements = statements;
+ this.clusteringComponents = clusteringComponents;
+ }
+
+ public ValidatingSchemaQuery create(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, ConsistencyLevel cl)
+ {
+ return new ValidatingSchemaQuery(timer, settings, generator, seedManager, statements, cl, clusteringComponents);
+ }
+ }
+
+ public static List<Factory> create(TableMetadata metadata, StressSettings settings)
+ {
+ List<Factory> factories = new ArrayList<>();
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ sb.append("SELECT * FROM ");
+ sb.append(metadata.getName());
+ sb.append(" WHERE");
+ for (ColumnMetadata pk : metadata.getPartitionKey())
+ {
+ sb.append(first ? " " : " AND ");
+ sb.append(pk.getName());
+ sb.append(" = ?");
+ first = false;
+ }
+ String base = sb.toString();
+
+ factories.add(new Factory(new ValidatingStatement[] { prepare(settings, base, true, true) }, 0));
+
+ int maxDepth = metadata.getClusteringColumns().size() - 1;
+ for (int depth = 0 ; depth <= maxDepth ; depth++)
+ {
+ StringBuilder cc = new StringBuilder();
+ StringBuilder arg = new StringBuilder();
+ cc.append("("); arg.append("(");
+ for (int d = 0 ; d <= depth ; d++)
+ {
+ if (d > 0) { cc.append(","); arg.append(","); }
+ cc.append(metadata.getClusteringColumns().get(d).getName());
+ arg.append("?");
+ }
+ cc.append(")"); arg.append(")");
+
+ ValidatingStatement[] statements = new ValidatingStatement[depth < maxDepth ? 1 : 4];
+ int i = 0;
+ for (boolean incLb : depth < maxDepth ? new boolean[] { true } : new boolean[] { true, false } )
+ {
+ for (boolean incUb : depth < maxDepth ? new boolean[] { false } : new boolean[] { true, false } )
+ {
+ String lb = incLb ? ">=" : ">";
+ String ub = incUb ? "<=" : "<";
+ sb.setLength(0);
+ sb.append(base);
+ sb.append(" AND ");
+ sb.append(cc);
+ sb.append(lb);
+ sb.append(arg);
+ sb.append(" AND ");
+ sb.append(cc);
+ sb.append(ub);
+ sb.append(arg);
+ statements[i++] = prepare(settings, sb.toString(), incLb, incUb);
+ }
+ }
+ factories.add(new Factory(statements, depth + 1));
+ }
+
+ return factories;
+ }
+
+ private static class ValidatingStatement
+ {
+ final PreparedStatement statement;
+ final Integer thriftId;
+ final boolean inclusiveStart;
+ final boolean inclusiveEnd;
+ private ValidatingStatement(PreparedStatement statement, Integer thriftId, boolean inclusiveStart, boolean inclusiveEnd)
+ {
+ this.statement = statement;
+ this.thriftId = thriftId;
+ this.inclusiveStart = inclusiveStart;
+ this.inclusiveEnd = inclusiveEnd;
+ }
+ }
+
+ private static ValidatingStatement prepare(StressSettings settings, String cql, boolean incLb, boolean incUb)
+ {
+ JavaDriverClient jclient = settings.getJavaDriverClient();
+ ThriftClient tclient = settings.getThriftClient();
+ PreparedStatement statement = jclient.prepare(cql);
+ try
+ {
+ Integer thriftId = tclient.prepare_cql3_query(cql, Compression.NONE);
+ return new ValidatingStatement(statement, thriftId, incLb, incUb);
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
index 3a1d552..861b1a4 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
@@ -21,10 +21,7 @@ package org.apache.cassandra.stress.settings;
*/
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.generate.DistributionFactory;
@@ -58,9 +55,9 @@ public class SettingsCommandPreDefinedMixed extends SettingsCommandPreDefined
final SeedManager seeds = new SeedManager(settings);
return new SampledOpDistributionFactory<Command>(ratios, clustering)
{
- protected Operation get(Timer timer, PartitionGenerator generator, Command key)
+ protected List<? extends Operation> get(Timer timer, PartitionGenerator generator, Command key)
{
- return PredefinedOperation.operation(key, timer, generator, seeds, settings, add);
+ return Collections.singletonList(PredefinedOperation.operation(key, timer, generator, seeds, settings, add));
}
protected PartitionGenerator newGenerator()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
index d4e43cf..5228828 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
@@ -24,6 +24,7 @@ package org.apache.cassandra.stress.settings;
import java.io.File;
import java.net.URI;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -75,11 +76,13 @@ public class SettingsCommandUser extends SettingsCommand
final SeedManager seeds = new SeedManager(settings);
return new SampledOpDistributionFactory<String>(ratios, clustering)
{
- protected Operation get(Timer timer, PartitionGenerator generator, String key)
+ protected List<? extends Operation> get(Timer timer, PartitionGenerator generator, String key)
{
if (key.equalsIgnoreCase("insert"))
- return profile.getInsert(timer, generator, seeds, settings);
- return profile.getQuery(key, timer, generator, seeds, settings);
+ return Collections.singletonList(profile.getInsert(timer, generator, seeds, settings));
+ if (key.equalsIgnoreCase("validate"))
+ return profile.getValidate(timer, generator, seeds, settings);
+ return Collections.singletonList(profile.getQuery(key, timer, generator, seeds, settings));
}
protected PartitionGenerator newGenerator()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bee990c/tools/stress/src/org/apache/cassandra/stress/settings/ValidationType.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/ValidationType.java b/tools/stress/src/org/apache/cassandra/stress/settings/ValidationType.java
deleted file mode 100644
index 710b717..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/settings/ValidationType.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.cassandra.stress.settings;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-public enum ValidationType
-{
-
- NOT_FAIL, NON_ZERO, SUBSET, EQUAL
-
-}