You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2014/03/13 22:22:03 UTC
[02/15] git commit: Fix stress to do proper counter reads Patch by
Benedict Elliott Smith; reviewed by Pavel Yaskevich for CASSANDRA-6835
Fix stress to do proper counter reads
Patch by Benedict Elliott Smith; reviewed by Pavel Yaskevich for CASSANDRA-6835
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e2c6105
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e2c6105
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e2c6105
Branch: refs/heads/trunk
Commit: 3e2c610577fc1716e6ee41ef6af0976587344615
Parents: 948964b
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Wed Mar 12 16:57:04 2014 -0700
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Wed Mar 12 16:57:47 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/utils/FBUtilities.java | 2 +-
.../org/apache/cassandra/stress/Operation.java | 109 +++++++++++++------
.../apache/cassandra/stress/StressAction.java | 4 +-
.../cassandra/stress/generatedata/RowGen.java | 1 +
.../generatedata/RowGenDistributedSize.java | 7 ++
.../stress/operations/CqlCounterAdder.java | 17 ++-
.../stress/operations/CqlCounterGetter.java | 9 +-
.../operations/CqlIndexedRangeSlicer.java | 16 +--
.../stress/operations/CqlInserter.java | 10 +-
.../stress/operations/CqlOperation.java | 73 ++++++++-----
.../stress/operations/CqlRangeSlicer.java | 8 +-
.../cassandra/stress/operations/CqlReader.java | 12 +-
.../stress/operations/ThriftCounterAdder.java | 12 +-
.../stress/operations/ThriftCounterGetter.java | 15 +--
.../operations/ThriftIndexedRangeSlicer.java | 5 +-
.../stress/operations/ThriftInserter.java | 6 +-
.../stress/operations/ThriftMultiGetter.java | 3 +-
.../stress/operations/ThriftRangeSlicer.java | 3 +-
.../stress/operations/ThriftReader.java | 16 +--
.../cassandra/stress/settings/Command.java | 72 +++++++-----
.../cassandra/stress/settings/Option.java | 1 +
.../stress/settings/OptionDataGen.java | 5 +
.../stress/settings/OptionDistribution.java | 13 ++-
.../cassandra/stress/settings/OptionMulti.java | 39 ++++++-
.../stress/settings/OptionReplication.java | 2 +-
.../cassandra/stress/settings/OptionSimple.java | 2 +-
.../stress/settings/SettingsColumn.java | 29 +++--
.../stress/settings/SettingsCommand.java | 30 ++---
.../stress/settings/SettingsCommandMixed.java | 46 ++++----
.../stress/settings/SettingsCommandMulti.java | 90 ---------------
.../cassandra/stress/settings/SettingsKey.java | 2 +-
.../cassandra/stress/settings/SettingsMisc.java | 4 +-
.../cassandra/stress/settings/SettingsMode.java | 40 +++++--
.../stress/settings/SettingsSchema.java | 7 +-
.../stress/settings/SettingsTransport.java | 2 +-
.../stress/settings/StressSettings.java | 30 +++--
.../cassandra/stress/util/JavaDriverClient.java | 4 +-
38 files changed, 410 insertions(+), 337 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 912e6af..50e7da3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,7 @@
* Fix ClassCastException for compact table with composites (CASSANDRA-6738)
* Fix potentially repairing with wrong nodes (CASSANDRA-6808)
* Change caching option syntax (CASSANDRA-6745)
+ * Fix stress to do proper counter reads (CASSANDRA-6835)
Merged from 2.0:
* Avoid race-prone second "scrub" of system keyspace (CASSANDRA-6797)
* Pool CqlRecordWriter clients by inetaddress rather than Range
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 0a94cc0..7b574e2 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -370,7 +370,7 @@ public class FBUtilities
in = FBUtilities.class.getClassLoader().getResourceAsStream("org/apache/cassandra/config/version.properties");
if (in == null)
{
- return "Unknown";
+ return System.getProperty("cassandra.releaseVersion", "Unknown");
}
Properties props = new Properties();
props.load(in);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index 4519b19..33cca17 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -19,27 +19,16 @@ package org.apache.cassandra.stress;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.cassandra.stress.generatedata.Distribution;
import org.apache.cassandra.stress.generatedata.KeyGen;
import org.apache.cassandra.stress.generatedata.RowGen;
-import org.apache.cassandra.stress.operations.CqlCounterAdder;
-import org.apache.cassandra.stress.operations.CqlCounterGetter;
-import org.apache.cassandra.stress.operations.CqlIndexedRangeSlicer;
-import org.apache.cassandra.stress.operations.CqlInserter;
-import org.apache.cassandra.stress.operations.CqlMultiGetter;
-import org.apache.cassandra.stress.operations.CqlRangeSlicer;
-import org.apache.cassandra.stress.operations.CqlReader;
-import org.apache.cassandra.stress.operations.ThriftCounterAdder;
-import org.apache.cassandra.stress.operations.ThriftCounterGetter;
-import org.apache.cassandra.stress.operations.ThriftIndexedRangeSlicer;
-import org.apache.cassandra.stress.operations.ThriftInserter;
-import org.apache.cassandra.stress.operations.ThriftMultiGetter;
-import org.apache.cassandra.stress.operations.ThriftRangeSlicer;
-import org.apache.cassandra.stress.operations.ThriftReader;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.CqlVersion;
import org.apache.cassandra.stress.settings.SettingsCommandMixed;
@@ -49,6 +38,8 @@ import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.transport.SimpleClient;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -79,6 +70,7 @@ public abstract class Operation
public final Command type;
public final KeyGen keyGen;
public final RowGen rowGen;
+ public final Distribution counteradd;
public final List<ColumnParent> columnParents;
public final StressMetrics metrics;
public final SettingsCommandMixed.CommandSelector commandSelector;
@@ -99,19 +91,12 @@ public abstract class Operation
commandSelector = null;
substates = null;
}
+ counteradd = settings.command.add.get();
this.settings = settings;
this.keyGen = settings.keys.newKeyGen();
this.rowGen = settings.columns.newRowGen();
this.metrics = metrics;
- if (!settings.columns.useSuperColumns)
- columnParents = Collections.singletonList(new ColumnParent(settings.schema.columnFamily));
- else
- {
- ColumnParent[] cp = new ColumnParent[settings.columns.superColumns];
- for (int i = 0 ; i < cp.length ; i++)
- cp[i] = new ColumnParent("Super1").setSuper_column(ByteBufferUtil.bytes("S" + i));
- columnParents = Arrays.asList(cp);
- }
+ this.columnParents = columnParents(type, settings);
}
private State(Command type, State copy)
@@ -120,13 +105,29 @@ public abstract class Operation
this.timer = copy.timer;
this.rowGen = copy.rowGen;
this.keyGen = copy.keyGen;
- this.columnParents = copy.columnParents;
+ this.columnParents = columnParents(type, copy.settings);
this.metrics = copy.metrics;
this.settings = copy.settings;
+ this.counteradd = copy.counteradd;
this.substates = null;
this.commandSelector = null;
}
+ private List<ColumnParent> columnParents(Command type, StressSettings settings)
+ {
+ if (!settings.columns.useSuperColumns)
+ return Collections.singletonList(new ColumnParent(type.table));
+ else
+ {
+ ColumnParent[] cp = new ColumnParent[settings.columns.superColumns];
+ for (int i = 0 ; i < cp.length ; i++)
+ cp[i] = new ColumnParent(type.supertable).setSuper_column(ByteBufferUtil.bytes("S" + i));
+ return Arrays.asList(cp);
+ }
+ }
+
+
+
public boolean isCql3()
{
return settings.mode.cqlVersion == CqlVersion.CQL3;
@@ -172,6 +173,53 @@ public abstract class Operation
return state.rowGen.generate(index, key);
}
+ private int sliceStart(int count)
+ {
+ if (count == state.settings.columns.maxColumnsPerKey)
+ return 0;
+ return 1 + ThreadLocalRandom.current().nextInt(state.settings.columns.maxColumnsPerKey - count);
+ }
+
+ protected SlicePredicate slicePredicate()
+ {
+ final SlicePredicate predicate = new SlicePredicate();
+ if (state.settings.columns.slice)
+ {
+ int count = state.rowGen.count(index);
+ int start = sliceStart(count);
+ predicate.setSlice_range(new SliceRange()
+ .setStart(state.settings.columns.names.get(start))
+ .setFinish(new byte[] {})
+ .setReversed(false)
+ .setCount(count)
+ );
+ }
+ else
+ predicate.setColumn_names(randomNames());
+ return predicate;
+ }
+
+ protected List<ByteBuffer> randomNames()
+ {
+ int count = state.rowGen.count(index);
+ List<ByteBuffer> src = state.settings.columns.names;
+ if (count == src.size())
+ return src;
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ List<ByteBuffer> r = new ArrayList<>();
+ int c = 0, o = 0;
+ while (c < count && count + o < src.size())
+ {
+ int leeway = src.size() - (count + o);
+ int spreadover = count - c;
+ o += Math.round(rnd.nextDouble() * (leeway / (double) spreadover));
+ r.add(src.get(o + c++));
+ }
+ while (c < count)
+ r.add(src.get(o + c++));
+ return r;
+ }
+
/**
* Run operation
* @param client Cassandra Thrift client connection
@@ -213,10 +261,11 @@ public abstract class Operation
if (!success)
{
- error(String.format("Operation [%d] x%d key %s %s%n",
+ error(String.format("Operation [%d] x%d key %s (0x%s) %s%n",
index,
tries,
run.key(),
+ ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(run.key())),
(exceptionMessage == null)
? "Data returned was not validated"
: "Error executing: " + exceptionMessage));
@@ -239,14 +288,4 @@ public abstract class Operation
System.err.println(message);
}
- public static ByteBuffer getColumnNameBytes(int i)
- {
- return ByteBufferUtil.bytes("C" + i);
- }
-
- public static String getColumnName(int i)
- {
- return "C" + i;
- }
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 94824ec..e7cdd0b 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -87,7 +87,7 @@ public class StressAction implements Runnable
warmup(subtype, command);
return;
case MULTI:
- int keysAtOnce = ((SettingsCommandMulti) command).keysAtOnce;
+ int keysAtOnce = command.keysAtOnce;
iterations = Math.min(50000, (int) Math.ceil(500000d / keysAtOnce));
break;
default:
@@ -298,6 +298,8 @@ public class StressAction implements Runnable
case SIMPLE_NATIVE:
op.run(sclient);
break;
+ case THRIFT:
+ case THRIFT_SMART:
default:
op.run(tclient);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
index cb0dc1c..9c6ca43 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
@@ -46,6 +46,7 @@ public abstract class RowGen
// these byte[] may be re-used
abstract List<ByteBuffer> getColumns(long operationIndex);
+ abstract public int count(long operationIndex);
abstract public boolean isDeterministic();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java
index eecbc7e..fffad2f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java
@@ -51,6 +51,8 @@ public class RowGenDistributedSize extends RowGen
this.sizeDistribution = sizeDistribution;
ret = new ByteBuffer[(int) countDistribution.maxValue()];
sizes = new int[ret.length];
+ // TODO: should keep it deterministic in event that count distribution is not, but size and dataGen are, so that
+ // we simply need to generate the correct selection of columns
this.isDeterministic = dataGen.isDeterministic() && countDistribution.maxValue() == countDistribution.minValue()
&& sizeDistribution.minValue() == sizeDistribution.maxValue();
}
@@ -100,6 +102,11 @@ public class RowGenDistributedSize extends RowGen
return Arrays.asList(ret).subList(0, count);
}
+ public int count(long operationIndex)
+ {
+ return (int) countDistribution.next();
+ }
+
@Override
public boolean isDeterministic()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
index aae99b5..9a8c37d 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
@@ -22,9 +22,12 @@ package org.apache.cassandra.stress.operations;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
public class CqlCounterAdder extends CqlOperation<Integer>
{
public CqlCounterAdder(State state, long idx)
@@ -35,7 +38,7 @@ public class CqlCounterAdder extends CqlOperation<Integer>
@Override
protected String buildQuery()
{
- String counterCF = state.isCql2() ? "Counter1" : "Counter3";
+ String counterCF = state.isCql2() ? state.type.table : "Counter3";
StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired(counterCF));
@@ -50,20 +53,24 @@ public class CqlCounterAdder extends CqlOperation<Integer>
if (i > 0)
query.append(",");
- query.append('C').append(i).append("=C").append(i).append("+1");
+ query.append('C').append(i).append("=C").append(i).append("+?");
}
query.append(" WHERE KEY=?");
return query.toString();
}
@Override
- protected List<ByteBuffer> getQueryParameters(byte[] key)
+ protected List<Object> getQueryParameters(byte[] key)
{
- return Collections.singletonList(ByteBuffer.wrap(key));
+ final List<Object> list = new ArrayList<>();
+ for (int i = 0; i < state.settings.columns.maxColumnsPerKey; i++)
+ list.add(state.counteradd.next());
+ list.add(ByteBuffer.wrap(key));
+ return list;
}
@Override
- protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
+ protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String keyid, ByteBuffer key)
{
return new CqlRunOpAlwaysSucceed(client, query, queryId, params, keyid, key, 1);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
index 31fd20d..88d622e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
@@ -34,9 +34,9 @@ public class CqlCounterGetter extends CqlOperation<Integer>
}
@Override
- protected List<ByteBuffer> getQueryParameters(byte[] key)
+ protected List<Object> getQueryParameters(byte[] key)
{
- return Collections.singletonList(ByteBuffer.wrap(key));
+ return Collections.<Object>singletonList(ByteBuffer.wrap(key));
}
@Override
@@ -44,12 +44,13 @@ public class CqlCounterGetter extends CqlOperation<Integer>
{
StringBuilder query = new StringBuilder("SELECT ");
+ // TODO: obey slice/noslice option (instead of always slicing)
if (state.isCql2())
query.append("FIRST ").append(state.settings.columns.maxColumnsPerKey).append(" ''..''");
else
query.append("*");
- String counterCF = state.isCql2() ? "Counter1" : "Counter3";
+ String counterCF = state.isCql2() ? state.type.table : "Counter3";
query.append(" FROM ").append(wrapInQuotesIfRequired(counterCF));
@@ -60,7 +61,7 @@ public class CqlCounterGetter extends CqlOperation<Integer>
}
@Override
- protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
+ protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String keyid, ByteBuffer key)
{
return new CqlRunOpTestNonEmpty(client, query, queryId, params, keyid, key);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
index ff43322..c971844 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
@@ -26,7 +26,6 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
-import org.apache.cassandra.stress.settings.SettingsCommandMulti;
import org.apache.cassandra.utils.FBUtilities;
public class CqlIndexedRangeSlicer extends CqlOperation<byte[][]>
@@ -40,7 +39,7 @@ public class CqlIndexedRangeSlicer extends CqlOperation<byte[][]>
}
@Override
- protected List<ByteBuffer> getQueryParameters(byte[] key)
+ protected List<Object> getQueryParameters(byte[] key)
{
throw new UnsupportedOperationException();
}
@@ -55,14 +54,15 @@ public class CqlIndexedRangeSlicer extends CqlOperation<byte[][]>
else
query.append("*");
- query.append(" FROM Standard1");
+ query.append(" FROM ");
+ query.append(wrapInQuotesIfRequired(state.type.table));
if (state.isCql2())
query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
- final String columnName = getColumnName(1);
+ final String columnName = (state.settings.columns.namestrs.get(1));
query.append(" WHERE ").append(columnName).append("=?")
- .append(" AND KEY > ? LIMIT ").append(((SettingsCommandMulti)state.settings.command).keysAtOnce);
+ .append(" AND KEY > ? LIMIT ").append(state.settings.command.keysAtOnce);
return query.toString();
}
@@ -76,7 +76,7 @@ public class CqlIndexedRangeSlicer extends CqlOperation<byte[][]>
int rowCount;
do
{
- List<ByteBuffer> params = Arrays.asList(value, ByteBuffer.wrap(minKey));
+ List<Object> params = Arrays.<Object>asList(value, ByteBuffer.wrap(minKey));
CqlRunOp<byte[][]> op = run(client, params, value, new String(value.array()));
byte[][] keys = op.result;
rowCount = keys.length;
@@ -88,7 +88,7 @@ public class CqlIndexedRangeSlicer extends CqlOperation<byte[][]>
private final class IndexedRangeSliceRunOp extends CqlRunOpFetchKeys
{
- protected IndexedRangeSliceRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
+ protected IndexedRangeSliceRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String keyid, ByteBuffer key)
{
super(client, query, queryId, params, keyid, key);
}
@@ -101,7 +101,7 @@ public class CqlIndexedRangeSlicer extends CqlOperation<byte[][]>
}
@Override
- protected CqlRunOp<byte[][]> buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
+ protected CqlRunOp<byte[][]> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String keyid, ByteBuffer key)
{
return new IndexedRangeSliceRunOp(client, query, queryId, params, keyid, key);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
index 8d964f5..71cdadf 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
@@ -33,12 +33,14 @@ public class CqlInserter extends CqlOperation<Integer>
public CqlInserter(State state, long idx)
{
super(state, idx);
+ if (state.settings.columns.useTimeUUIDComparator)
+ throw new IllegalStateException("Cannot use TimeUUID Comparator with CQL");
}
@Override
protected String buildQuery()
{
- StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired(state.settings.schema.columnFamily));
+ StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired(state.type.table));
if (state.isCql2())
query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
@@ -69,9 +71,9 @@ public class CqlInserter extends CqlOperation<Integer>
}
@Override
- protected List<ByteBuffer> getQueryParameters(byte[] key)
+ protected List<Object> getQueryParameters(byte[] key)
{
- final ArrayList<ByteBuffer> queryParams = new ArrayList<>();
+ final ArrayList<Object> queryParams = new ArrayList<>();
final List<ByteBuffer> values = generateColumnValues(ByteBuffer.wrap(key));
queryParams.addAll(values);
queryParams.add(ByteBuffer.wrap(key));
@@ -79,7 +81,7 @@ public class CqlInserter extends CqlOperation<Integer>
}
@Override
- protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
+ protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String keyid, ByteBuffer key)
{
return new CqlRunOpAlwaysSucceed(client, query, queryId, params, keyid, key, 1);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
index b17f520..5b27146 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.stress.operations;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -44,9 +45,9 @@ import org.apache.thrift.TException;
public abstract class CqlOperation<V> extends Operation
{
- protected abstract List<ByteBuffer> getQueryParameters(byte[] key);
+ protected abstract List<Object> getQueryParameters(byte[] key);
protected abstract String buildQuery();
- protected abstract CqlRunOp<V> buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String id, ByteBuffer key);
+ protected abstract CqlRunOp<V> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String id, ByteBuffer key);
public CqlOperation(State state, long idx)
{
@@ -55,9 +56,11 @@ public abstract class CqlOperation<V> extends Operation
throw new IllegalStateException("Super columns are not implemented for CQL");
if (state.settings.columns.variableColumnCount)
throw new IllegalStateException("Variable column counts are not implemented for CQL");
+ if (state.settings.columns.useTimeUUIDComparator)
+ throw new IllegalStateException("Cannot use TimeUUID Comparator with CQL");
}
- protected CqlRunOp<V> run(final ClientWrapper client, final List<ByteBuffer> queryParams, final ByteBuffer key, final String keyid) throws IOException
+ protected CqlRunOp<V> run(final ClientWrapper client, final List<Object> queryParams, final ByteBuffer key, final String keyid) throws IOException
{
final CqlRunOp<V> op;
if (state.settings.mode.style == ConnectionStyle.CQL_PREPARED)
@@ -99,7 +102,7 @@ public abstract class CqlOperation<V> extends Operation
protected void run(final ClientWrapper client) throws IOException
{
final byte[] key = getKey().array();
- final List<ByteBuffer> queryParams = getQueryParameters(key);
+ final List<Object> queryParams = getQueryParameters(key);
run(client, queryParams, ByteBuffer.wrap(key), new String(key));
}
@@ -111,7 +114,7 @@ public abstract class CqlOperation<V> extends Operation
final int keyCount;
- protected CqlRunOpAlwaysSucceed(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String id, ByteBuffer key, int keyCount)
+ protected CqlRunOpAlwaysSucceed(ClientWrapper client, String query, Object queryId, List<Object> params, String id, ByteBuffer key, int keyCount)
{
super(client, query, queryId, RowCountHandler.INSTANCE, params, id, key);
this.keyCount = keyCount;
@@ -134,7 +137,7 @@ public abstract class CqlOperation<V> extends Operation
protected final class CqlRunOpTestNonEmpty extends CqlRunOp<Integer>
{
- protected CqlRunOpTestNonEmpty(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String id, ByteBuffer key)
+ protected CqlRunOpTestNonEmpty(ClientWrapper client, String query, Object queryId, List<Object> params, String id, ByteBuffer key)
{
super(client, query, queryId, RowCountHandler.INSTANCE, params, id, key);
}
@@ -156,7 +159,7 @@ public abstract class CqlOperation<V> extends Operation
protected abstract class CqlRunOpFetchKeys extends CqlRunOp<byte[][]>
{
- protected CqlRunOpFetchKeys(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String id, ByteBuffer key)
+ protected CqlRunOpFetchKeys(ClientWrapper client, String query, Object queryId, List<Object> params, String id, ByteBuffer key)
{
super(client, query, queryId, KeysHandler.INSTANCE, params, id, key);
}
@@ -175,7 +178,7 @@ public abstract class CqlOperation<V> extends Operation
final List<List<ByteBuffer>> expect;
// a null value for an item in expect means we just check the row is present
- protected CqlRunOpMatchResults(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String id, ByteBuffer key, List<List<ByteBuffer>> expect)
+ protected CqlRunOpMatchResults(ClientWrapper client, String query, Object queryId, List<Object> params, String id, ByteBuffer key, List<List<ByteBuffer>> expect)
{
super(client, query, queryId, RowsHandler.INSTANCE, params, id, key);
this.expect = expect;
@@ -209,13 +212,13 @@ public abstract class CqlOperation<V> extends Operation
final ClientWrapper client;
final String query;
final Object queryId;
- final List<ByteBuffer> params;
+ final List<Object> params;
final String id;
final ByteBuffer key;
final ResultHandler<V> handler;
V result;
- private CqlRunOp(ClientWrapper client, String query, Object queryId, ResultHandler<V> handler, List<ByteBuffer> params, String id, ByteBuffer key)
+ private CqlRunOp(ClientWrapper client, String query, Object queryId, ResultHandler<V> handler, List<Object> params, String id, ByteBuffer key)
{
this.client = client;
this.query = query;
@@ -287,8 +290,8 @@ public abstract class CqlOperation<V> extends Operation
protected interface ClientWrapper
{
Object createPreparedStatement(String cqlQuery) throws TException;
- <V> V execute(Object preparedStatementId, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException;
- <V> V execute(String query, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException;
+ <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException;
+ <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException;
}
private final class JavaDriverWrapper implements ClientWrapper
@@ -300,14 +303,14 @@ public abstract class CqlOperation<V> extends Operation
}
@Override
- public <V> V execute(String query, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler)
+ public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
{
String formattedQuery = formatCqlQuery(query, queryParams, state.isCql3());
return handler.javaDriverHandler().apply(client.execute(formattedQuery, ThriftConversion.fromThrift(state.settings.command.consistencyLevel)));
}
@Override
- public <V> V execute(Object preparedStatementId, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler)
+ public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
{
return handler.javaDriverHandler().apply(
client.executePrepared(
@@ -332,19 +335,19 @@ public abstract class CqlOperation<V> extends Operation
}
@Override
- public <V> V execute(String query, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler)
+ public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
{
String formattedQuery = formatCqlQuery(query, queryParams, state.isCql3());
return handler.thriftHandler().apply(client.execute(formattedQuery, ThriftConversion.fromThrift(state.settings.command.consistencyLevel)));
}
@Override
- public <V> V execute(Object preparedStatementId, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler)
+ public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
{
return handler.thriftHandler().apply(
client.executePrepared(
(byte[]) preparedStatementId,
- queryParams,
+ toByteBufferParams(queryParams),
ThriftConversion.fromThrift(state.settings.command.consistencyLevel)));
}
@@ -365,7 +368,7 @@ public abstract class CqlOperation<V> extends Operation
}
@Override
- public <V> V execute(String query, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException
+ public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
{
String formattedQuery = formatCqlQuery(query, queryParams, true);
return handler.simpleNativeHandler().apply(
@@ -374,11 +377,11 @@ public abstract class CqlOperation<V> extends Operation
}
@Override
- public <V> V execute(Object preparedStatementId, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException
+ public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
{
Integer id = (Integer) preparedStatementId;
return handler.simpleNativeHandler().apply(
- client.execute_prepared_cql3_query(id, key, queryParams, state.settings.command.consistencyLevel)
+ client.execute_prepared_cql3_query(id, key, toByteBufferParams(queryParams), state.settings.command.consistencyLevel)
);
}
@@ -399,7 +402,7 @@ public abstract class CqlOperation<V> extends Operation
}
@Override
- public <V> V execute(String query, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException
+ public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
{
String formattedQuery = formatCqlQuery(query, queryParams, false);
return handler.simpleNativeHandler().apply(
@@ -408,11 +411,11 @@ public abstract class CqlOperation<V> extends Operation
}
@Override
- public <V> V execute(Object preparedStatementId, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException
+ public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
{
Integer id = (Integer) preparedStatementId;
return handler.simpleNativeHandler().apply(
- client.execute_prepared_cql_query(id, key, queryParams)
+ client.execute_prepared_cql_query(id, key, toByteBufferParams(queryParams))
);
}
@@ -647,7 +650,7 @@ public abstract class CqlOperation<V> extends Operation
* @param parms sequence of string query parameters
* @return formatted CQL query string
*/
- private static String formatCqlQuery(String query, List<ByteBuffer> parms, boolean isCql3)
+ private static String formatCqlQuery(String query, List<Object> parms, boolean isCql3)
{
int marker, position = 0;
StringBuilder result = new StringBuilder();
@@ -655,10 +658,14 @@ public abstract class CqlOperation<V> extends Operation
if (-1 == (marker = query.indexOf('?')) || parms.size() == 0)
return query;
- for (ByteBuffer parm : parms)
+ for (Object parm : parms)
{
result.append(query.substring(position, marker));
- result.append(getUnQuotedCqlBlob(parm, isCql3));
+ if (parm instanceof ByteBuffer)
+ result.append(getUnQuotedCqlBlob((ByteBuffer) parm, isCql3));
+ else if (parm instanceof Long)
+ result.append(parm.toString());
+ else throw new AssertionError();
position = marker + 1;
if (-1 == (marker = query.indexOf('?', position + 1)))
@@ -671,6 +678,20 @@ public abstract class CqlOperation<V> extends Operation
return result.toString();
}
+ private static List<ByteBuffer> toByteBufferParams(List<Object> params)
+ {
+ List<ByteBuffer> r = new ArrayList<>();
+ for (Object param : params)
+ {
+ if (param instanceof ByteBuffer)
+ r.add((ByteBuffer) param);
+ else if (param instanceof Long)
+ r.add(ByteBufferUtil.bytes((Long) param));
+ else throw new AssertionError();
+ }
+ return r;
+ }
+
protected String wrapInQuotesIfRequired(String string)
{
return state.settings.mode.cqlVersion == CqlVersion.CQL3
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
index 76ba966..16cdff3 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
@@ -33,16 +33,16 @@ public class CqlRangeSlicer extends CqlOperation<Integer>
}
@Override
- protected List<ByteBuffer> getQueryParameters(byte[] key)
+ protected List<Object> getQueryParameters(byte[] key)
{
- return Collections.singletonList(ByteBuffer.wrap(key));
+ return Collections.<Object>singletonList(ByteBuffer.wrap(key));
}
@Override
protected String buildQuery()
{
StringBuilder query = new StringBuilder("SELECT FIRST ").append(state.settings.columns.maxColumnsPerKey)
- .append(" ''..'' FROM ").append(state.settings.schema.columnFamily);
+ .append(" ''..'' FROM ").append(wrapInQuotesIfRequired(state.type.table));
if (state.isCql2())
query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
@@ -51,7 +51,7 @@ public class CqlRangeSlicer extends CqlOperation<Integer>
}
@Override
- protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
+ protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String keyid, ByteBuffer key)
{
return new CqlRunOpTestNonEmpty(client, query, queryId, params, keyid, key);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
index 44da43f..4b8d69e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
@@ -40,7 +40,7 @@ public class CqlReader extends CqlOperation<ByteBuffer[][]>
{
StringBuilder query = new StringBuilder("SELECT ");
- if (state.settings.columns.names == null)
+ if (state.settings.columns.slice)
{
if (state.isCql2())
query.append("FIRST ").append(state.settings.columns.maxColumnsPerKey).append(" ''..''");
@@ -57,7 +57,7 @@ public class CqlReader extends CqlOperation<ByteBuffer[][]>
}
}
- query.append(" FROM ").append(wrapInQuotesIfRequired(state.settings.schema.columnFamily));
+ query.append(" FROM ").append(wrapInQuotesIfRequired(state.type.table));
if (state.isCql2())
query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
@@ -66,21 +66,21 @@ public class CqlReader extends CqlOperation<ByteBuffer[][]>
}
@Override
- protected List<ByteBuffer> getQueryParameters(byte[] key)
+ protected List<Object> getQueryParameters(byte[] key)
{
if (state.settings.columns.names != null)
{
- final List<ByteBuffer> queryParams = new ArrayList<>();
+ final List<Object> queryParams = new ArrayList<>();
for (ByteBuffer name : state.settings.columns.names)
queryParams.add(name);
queryParams.add(ByteBuffer.wrap(key));
return queryParams;
}
- return Collections.singletonList(ByteBuffer.wrap(key));
+ return Collections.<Object>singletonList(ByteBuffer.wrap(key));
}
@Override
- protected CqlRunOp<ByteBuffer[][]> buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
+ protected CqlRunOp<ByteBuffer[][]> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String keyid, ByteBuffer key)
{
List<ByteBuffer> expectRow = state.rowGen.isDeterministic() ? generateColumnValues(key) : null;
return new CqlRunOpMatchResults(client, query, queryId, params, keyid, key, Arrays.asList(expectRow));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
index 26695a6..9bfe440 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
@@ -23,6 +23,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.util.ThriftClient;
@@ -33,15 +35,13 @@ public class ThriftCounterAdder extends Operation
public ThriftCounterAdder(State state, long index)
{
super(state, index);
- if (state.settings.columns.variableColumnCount)
- throw new IllegalStateException("Variable column counts not supported for counters");
}
public void run(final ThriftClient client) throws IOException
{
List<CounterColumn> columns = new ArrayList<>();
- for (int i = 0; i < state.settings.columns.maxColumnsPerKey; i++)
- columns.add(new CounterColumn(getColumnNameBytes(i), 1L));
+ for (ByteBuffer name : randomNames())
+ columns.add(new CounterColumn(name, state.counteradd.next()));
Map<String, List<Mutation>> row;
if (state.settings.columns.useSuperColumns)
@@ -53,7 +53,7 @@ public class ThriftCounterAdder extends Operation
ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setCounter_super_column(csc);
mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
}
- row = Collections.singletonMap("SuperCounter1", mutations);
+ row = Collections.singletonMap(state.type.supertable, mutations);
}
else
{
@@ -63,7 +63,7 @@ public class ThriftCounterAdder extends Operation
ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setCounter_column(c);
mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
}
- row = Collections.singletonMap("Counter1", mutations);
+ row = Collections.singletonMap(state.type.table, mutations);
}
final ByteBuffer key = getKey();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java
index 8567edd..6e36a28 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.stress.operations;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.util.ThriftClient;
@@ -31,20 +32,11 @@ public class ThriftCounterGetter extends Operation
public ThriftCounterGetter(State state, long index)
{
super(state, index);
- if (state.settings.columns.variableColumnCount)
- throw new IllegalStateException("Variable column counts not supported for counters");
}
public void run(final ThriftClient client) throws IOException
{
- SliceRange sliceRange = new SliceRange();
- // start/finish
- sliceRange.setStart(new byte[] {}).setFinish(new byte[] {});
- // reversed/count
- sliceRange.setReversed(false).setCount(state.settings.columns.maxColumnsPerKey);
- // initialize SlicePredicate with existing SliceRange
- final SlicePredicate predicate = new SlicePredicate().setSlice_range(sliceRange);
-
+ final SlicePredicate predicate = slicePredicate();
final ByteBuffer key = getKey();
for (final ColumnParent parent : state.columnParents)
{
@@ -54,7 +46,8 @@ public class ThriftCounterGetter extends Operation
@Override
public boolean run() throws Exception
{
- return client.get_slice(key, parent, predicate, state.settings.command.consistencyLevel).size() != 0;
+ List<?> r = client.get_slice(key, parent, predicate, state.settings.command.consistencyLevel);
+ return r != null && r.size() > 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
index 6eab209..8c8ec31 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.List;
import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.settings.SettingsCommandMulti;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -52,7 +51,7 @@ public class ThriftIndexedRangeSlicer extends Operation
final List<ByteBuffer> columns = generateColumnValues(getKey());
final ColumnParent parent = state.columnParents.get(0);
- final ByteBuffer columnName = getColumnNameBytes(1);
+ final ByteBuffer columnName = state.settings.columns.names.get(1);
final ByteBuffer value = columns.get(1); // only C1 column is indexed
IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value);
@@ -64,7 +63,7 @@ public class ThriftIndexedRangeSlicer extends Operation
final boolean first = minKey.length == 0;
final IndexClause clause = new IndexClause(Arrays.asList(expression),
ByteBuffer.wrap(minKey),
- ((SettingsCommandMulti) state.settings.command).keysAtOnce);
+ state.settings.command.keysAtOnce);
timeWithRetry(new RunOp()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
index b107f26..7077a95 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
@@ -53,7 +53,7 @@ public final class ThriftInserter extends Operation
ColumnOrSuperColumn column = new ColumnOrSuperColumn().setColumn(c);
mutations.add(new Mutation().setColumn_or_supercolumn(column));
}
- row = Collections.singletonMap(state.settings.schema.columnFamily, mutations);
+ row = Collections.singletonMap(state.type.table, mutations);
}
else
{
@@ -64,7 +64,7 @@ public final class ThriftInserter extends Operation
final ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setSuper_column(s);
mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
}
- row = Collections.singletonMap("Super1", mutations);
+ row = Collections.singletonMap(state.settings.command.type.supertable, mutations);
}
final Map<ByteBuffer, Map<String, List<Mutation>>> record = Collections.singletonMap(key, row);
@@ -104,7 +104,7 @@ public final class ThriftInserter extends Operation
// TODO : consider randomly allocating column names in case where have fewer than max columns
// but need to think about implications for indexes / indexed range slicer / other knock on effects
for (int i = 0 ; i < values.size() ; i++)
- columns.add(new Column(getColumnNameBytes(i)));
+ columns.add(new Column(state.settings.columns.names.get(i)));
for (int i = 0 ; i < values.size() ; i++)
columns.get(i)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java
index 01c7325..d8e0117 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.settings.SettingsCommandMulti;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.SlicePredicate;
@@ -50,7 +49,7 @@ public final class ThriftMultiGetter extends Operation
)
);
- final List<ByteBuffer> keys = getKeys(((SettingsCommandMulti) state.settings.command).keysAtOnce);
+ final List<ByteBuffer> keys = getKeys(state.settings.command.keysAtOnce);
for (final ColumnParent parent : state.columnParents)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
index ce6c8cd..021c4e8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.settings.SettingsCommandMulti;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.KeyRange;
@@ -55,7 +54,7 @@ public final class ThriftRangeSlicer extends Operation
new KeyRange(state.settings.columns.maxColumnsPerKey)
.setStart_key(start)
.setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER)
- .setCount(((SettingsCommandMulti)state.settings.command).keysAtOnce);
+ .setCount(state.settings.command.keysAtOnce);
for (final ColumnParent parent : state.columnParents)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
index c50843f..dccf469 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.stress.operations;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
import org.apache.cassandra.stress.Operation;
@@ -27,7 +26,6 @@ import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.SuperColumn;
public final class ThriftReader extends Operation
@@ -40,17 +38,7 @@ public final class ThriftReader extends Operation
public void run(final ThriftClient client) throws IOException
{
- final SlicePredicate predicate = new SlicePredicate();
- if (state.settings.columns.names == null)
- predicate.setSlice_range(new SliceRange()
- .setStart(new byte[] {})
- .setFinish(new byte[] {})
- .setReversed(false)
- .setCount(state.settings.columns.maxColumnsPerKey)
- );
- else // see CASSANDRA-3064 about why this is useful
- predicate.setColumn_names(state.settings.columns.names);
-
+ final SlicePredicate predicate = slicePredicate();
final ByteBuffer key = getKey();
final List<ByteBuffer> expect = state.rowGen.isDeterministic() ? generateColumnValues(key) : null;
for (final ColumnParent parent : state.columnParents)
@@ -63,6 +51,8 @@ public final class ThriftReader extends Operation
List<ColumnOrSuperColumn> row = client.get_slice(key, parent, predicate, state.settings.command.consistencyLevel);
if (expect == null)
return !row.isEmpty();
+ if (row == null)
+ return false;
if (!state.settings.columns.useSuperColumns)
{
if (row.size() != expect.size())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/Command.java b/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
index 60b65f7..d0350ad 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
@@ -27,54 +27,46 @@ import java.util.Map;
public enum Command
{
- READ(false,
- SettingsCommand.helpPrinter("read"),
+ READ(false, "Standard1", "Super1",
"Multiple concurrent reads - the cluster must first be populated by a write test",
CommandCategory.BASIC
),
- WRITE(true,
- SettingsCommand.helpPrinter("write"),
+ WRITE(true, "Standard1", "Super1",
"insert",
"Multiple concurrent writes against the cluster",
CommandCategory.BASIC
),
- MIXED(true,
- SettingsCommandMixed.helpPrinter(),
+ MIXED(true, null, null,
"Interleaving of any basic commands, with configurable ratio and distribution - the cluster must first be populated by a write test",
CommandCategory.MIXED
),
- RANGESLICE(false,
- SettingsCommandMulti.helpPrinter("range_slice"),
+ RANGESLICE(false, "Standard1", "Super1",
"Range slice queries - the cluster must first be populated by a write test",
CommandCategory.MULTI
),
- IRANGESLICE(false,
- SettingsCommandMulti.helpPrinter("indexed_range_slice"),
+ IRANGESLICE(false, "Standard1", "Super1",
"Range slice queries through a secondary index. The cluster must first be populated by a write test, with indexing enabled.",
- CommandCategory.MULTI
+ CommandCategory.BASIC
),
- READMULTI(false,
- SettingsCommandMulti.helpPrinter("readmulti"),
+ READMULTI(false, "Standard1", "Super1",
"multi_read",
"Multiple concurrent reads fetching multiple rows at once. The cluster must first be populated by a write test.",
CommandCategory.MULTI
),
- COUNTERWRITE(true,
- SettingsCommand.helpPrinter("counteradd"),
+ COUNTERWRITE(true, "Counter1", "SuperCounter1",
"counter_add",
"Multiple concurrent updates of counters.",
CommandCategory.BASIC
),
- COUNTERREAD(false,
- SettingsCommand.helpPrinter("counterread"),
+ COUNTERREAD(false, "Counter1", "SuperCounter1",
"counter_get",
"Multiple concurrent reads of counters. The cluster must first be populated by a counterwrite test.",
CommandCategory.BASIC
),
- HELP(false, SettingsMisc.helpHelpPrinter(), "-?", "Print help for a command or option", null),
- PRINT(false, SettingsMisc.printHelpPrinter(), "Inspect the output of a distribution definition", null),
- LEGACY(false, Legacy.helpPrinter(), "Legacy support mode", null)
+ HELP(false, null, null, "-?", "Print help for a command or option", null),
+ PRINT(false, null, null, "Inspect the output of a distribution definition", null),
+ LEGACY(false, null, null, "Legacy support mode", null)
;
@@ -100,23 +92,49 @@ public enum Command
public final CommandCategory category;
public final String extraName;
public final String description;
- public final Runnable helpPrinter;
+ public final String table;
+ public final String supertable;
- Command(boolean updates, Runnable helpPrinter, String description, CommandCategory category)
+ Command(boolean updates, String table, String supertable, String description, CommandCategory category)
{
- this(updates, helpPrinter, null, description, category);
+ this(updates, table, supertable, null, description, category);
}
- Command(boolean updates, Runnable helpPrinter, String extra, String description, CommandCategory category)
+
+ Command(boolean updates, String table, String supertable, String extra, String description, CommandCategory category)
{
+ this.table = table;
+ this.supertable = supertable;
this.updates = updates;
this.category = category;
- this.helpPrinter = helpPrinter;
this.extraName = extra;
this.description = description;
}
+
public void printHelp()
{
- helpPrinter.run();
+ helpPrinter().run();
+ }
+
+ public final Runnable helpPrinter()
+ {
+ switch (this)
+ {
+ case PRINT:
+ return SettingsMisc.printHelpPrinter();
+ case HELP:
+ return SettingsMisc.helpHelpPrinter();
+ case LEGACY:
+ return Legacy.helpPrinter();
+ }
+ switch (category)
+ {
+ case BASIC:
+ case MULTI:
+ return SettingsCommand.helpPrinter(this);
+ case MIXED:
+ return SettingsCommandMixed.helpPrinter();
+ }
+ throw new AssertionError();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/settings/Option.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/Option.java b/tools/stress/src/org/apache/cassandra/stress/settings/Option.java
index bc663f5..a9e669c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/Option.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/Option.java
@@ -31,6 +31,7 @@ abstract class Option
abstract String shortDisplay();
abstract String longDisplay();
abstract List<String> multiLineDisplay();
+ abstract boolean setByUser();
public int hashCode()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/settings/OptionDataGen.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionDataGen.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionDataGen.java
index f8ced72..bde2b10 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionDataGen.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionDataGen.java
@@ -83,6 +83,11 @@ class OptionDataGen extends Option
return factory != null || defaultFactory != null;
}
+ public boolean setByUser()
+ {
+ return factory != null;
+ }
+
@Override
public String shortDisplay()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
index feaf017..b84bbc2 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
@@ -43,11 +43,13 @@ class OptionDistribution extends Option
final String prefix;
private String spec;
private final String defaultSpec;
+ private final String description;
- public OptionDistribution(String prefix, String defaultSpec)
+ public OptionDistribution(String prefix, String defaultSpec, String description)
{
this.prefix = prefix;
this.defaultSpec = defaultSpec;
+ this.description = description;
}
@Override
@@ -88,7 +90,7 @@ class OptionDistribution extends Option
public String longDisplay()
{
- return shortDisplay() + ": Specify a mathematical distribution";
+ return shortDisplay() + ": " + description;
}
@Override
@@ -105,10 +107,15 @@ class OptionDistribution extends Option
);
}
+ boolean setByUser()
+ {
+ return spec != null;
+ }
+
@Override
public String shortDisplay()
{
- return prefix + "DIST(?)";
+ return (defaultSpec != null ? "[" : "") + prefix + "DIST(?)" + (defaultSpec != null ? "]" : "");
}
private static final Map<String, Impl> LOOKUP;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java
index 7074dc6..60faad8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java
@@ -100,7 +100,7 @@ abstract class OptionMulti extends Option
StringBuilder sb = new StringBuilder();
sb.append(name);
sb.append("(");
- for (Option option : options())
+ for (Option option : delegate.options())
{
sb.append(option);
sb.append(",");
@@ -112,7 +112,7 @@ abstract class OptionMulti extends Option
@Override
public String shortDisplay()
{
- return name + "(?)";
+ return (happy() ? "[" : "") + name + "(?)" + (happy() ? "]" : "");
}
@Override
@@ -121,7 +121,7 @@ abstract class OptionMulti extends Option
StringBuilder sb = new StringBuilder();
sb.append(name);
sb.append("(");
- for (Option opt : options())
+ for (Option opt : delegate.options())
{
sb.append(opt.shortDisplay());
}
@@ -181,6 +181,37 @@ abstract class OptionMulti extends Option
{
return Collections.emptyList();
}
- };
+
+ boolean setByUser()
+ {
+ return !options.isEmpty();
+ }
+ }
+
+ List<Option> optionsSetByUser()
+ {
+ List<Option> r = new ArrayList<>();
+ for (Option option : delegate.options())
+ if (option.setByUser())
+ r.add(option);
+ return r;
+ }
+
+ List<Option> defaultOptions()
+ {
+ List<Option> r = new ArrayList<>();
+ for (Option option : delegate.options())
+ if (!option.setByUser() && option.happy())
+ r.add(option);
+ return r;
+ }
+
+ boolean setByUser()
+ {
+ for (Option option : delegate.options())
+ if (option.setByUser())
+ return true;
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java
index 88665ab..06ec9b8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java
@@ -77,7 +77,7 @@ class OptionReplication extends OptionMulti
{
Class<?> clazz = Class.forName(fullname);
if (!AbstractReplicationStrategy.class.isAssignableFrom(clazz))
- throw new RuntimeException();
+ throw new IllegalArgumentException(clazz + " is not a replication strategy");
strategy = fullname;
break;
} catch (Exception _)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/settings/OptionSimple.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionSimple.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionSimple.java
index 2a9738a..9365e45 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionSimple.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionSimple.java
@@ -33,7 +33,7 @@ import com.google.common.base.Function;
class OptionSimple extends Option
{
- private final String displayPrefix;
+ final String displayPrefix;
private final Pattern matchPrefix;
private final String defaultValue;
private final Function<String, String> valueAdapter;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/settings/SettingsColumn.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsColumn.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsColumn.java
index 235d143..7e20ec6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsColumn.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsColumn.java
@@ -30,6 +30,7 @@ import java.util.Map;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.stress.generatedata.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
/**
* For parsing column options
@@ -39,12 +40,13 @@ public class SettingsColumn implements Serializable
public final int maxColumnsPerKey;
public final List<ByteBuffer> names;
+ public final List<String> namestrs;
public final String comparator;
public final boolean useTimeUUIDComparator;
public final int superColumns;
public final boolean useSuperColumns;
public final boolean variableColumnCount;
-
+ public final boolean slice;
private final DistributionFactory sizeDistribution;
private final DistributionFactory countDistribution;
private final DataGenFactory dataGenFactory;
@@ -95,11 +97,12 @@ public class SettingsColumn implements Serializable
comparator = TypeParser.parse(this.comparator);
} catch (Exception e)
{
- throw new IllegalStateException(e);
+ throw new IllegalArgumentException(this.comparator + " is not a valid type");
}
final String[] names = name.name.value().split(",");
this.names = new ArrayList<>(names.length);
+ this.namestrs = Arrays.asList(names);
for (String columnName : names)
this.names.add(comparator.fromString(columnName));
@@ -117,10 +120,21 @@ public class SettingsColumn implements Serializable
else
{
this.countDistribution = count.count.get();
- this.names = null;
+ ByteBuffer[] names = new ByteBuffer[(int) countDistribution.get().maxValue()];
+ String[] namestrs = new String[(int) countDistribution.get().maxValue()];
+ for (int i = 0 ; i < names.length ; i++)
+ {
+ names[i] = ByteBufferUtil.bytes("C" + i);
+ namestrs[i] = "C" + i;
+ }
+ this.names = Arrays.asList(names);
+ this.namestrs = Arrays.asList(namestrs);
}
maxColumnsPerKey = (int) countDistribution.get().maxValue();
variableColumnCount = countDistribution.get().minValue() < maxColumnsPerKey;
+ // TODO: should warn that we always slice for useTimeUUIDComparator?
+ slice = options.slice.setByUser() || useTimeUUIDComparator;
+ // TODO: with useTimeUUIDCOmparator, should we still try to select a random start for reads if possible?
}
public RowGen newRowGen()
@@ -134,7 +148,8 @@ public class SettingsColumn implements Serializable
{
final OptionSimple superColumns = new OptionSimple("super=", "[0-9]+", "0", "Number of super columns to use (no super columns used if not specified)", false);
final OptionSimple comparator = new OptionSimple("comparator=", "TimeUUIDType|AsciiType|UTF8Type", "AsciiType", "Column Comparator to use", false);
- final OptionDistribution size = new OptionDistribution("size=", "FIXED(34)");
+ final OptionSimple slice = new OptionSimple("slice", "", null, "If set, range slices will be used for reads, otherwise a names query will be", false);
+ final OptionDistribution size = new OptionDistribution("size=", "FIXED(34)", "Cell size distribution");
final OptionDataGen generator = new OptionDataGen("data=", "REPEAT(50)");
}
@@ -145,18 +160,18 @@ public class SettingsColumn implements Serializable
@Override
public List<? extends Option> options()
{
- return Arrays.asList(name, superColumns, comparator, size, generator);
+ return Arrays.asList(name, slice, superColumns, comparator, size, generator);
}
}
private static final class CountOptions extends Options
{
- final OptionDistribution count = new OptionDistribution("n=", "FIXED(5)");
+ final OptionDistribution count = new OptionDistribution("n=", "FIXED(5)", "Cell count distribution, per operation");
@Override
public List<? extends Option> options()
{
- return Arrays.asList(count, superColumns, comparator, size, generator);
+ return Arrays.asList(count, slice, superColumns, comparator, size, generator);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
index 18f570c..71b30e4 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.apache.cassandra.stress.generatedata.DistributionFactory;
import org.apache.cassandra.thrift.ConsistencyLevel;
// Generic command settings - common to read/write/etc
@@ -40,6 +41,8 @@ public class SettingsCommand implements Serializable
public final double targetUncertainty;
public final int minimumUncertaintyMeasurements;
public final int maximumUncertaintyMeasurements;
+ public final DistributionFactory add;
+ public final int keysAtOnce;
public SettingsCommand(Command type, GroupedOptions options)
{
@@ -55,6 +58,8 @@ public class SettingsCommand implements Serializable
this.tries = Math.max(1, Integer.parseInt(options.retries.value()) + 1);
this.ignoreErrors = options.ignoreErrors.setByUser();
this.consistencyLevel = ConsistencyLevel.valueOf(options.consistencyLevel.value().toUpperCase());
+ this.keysAtOnce = Integer.parseInt(options.atOnce.value());
+ this.add = options.add.get();
if (count != null)
{
this.count = Long.parseLong(count.count.value());
@@ -78,31 +83,29 @@ public class SettingsCommand implements Serializable
final OptionSimple retries = new OptionSimple("tries=", "[0-9]+", "9", "Number of tries to perform for each operation before failing", false);
final OptionSimple ignoreErrors = new OptionSimple("ignore_errors", "", null, "Do not print/log errors", false);
final OptionSimple consistencyLevel = new OptionSimple("cl=", "ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY", "ONE", "Consistency level to use", false);
+ final OptionDistribution add = new OptionDistribution("add=", "fixed(1)", "Distribution of value of counter increments");
+ final OptionSimple atOnce = new OptionSimple("at-once=", "[0-9]+", "1000", "Number of keys per operation for multiget", false);
}
static class Count extends Options
{
-
final OptionSimple count = new OptionSimple("n=", "[0-9]+", null, "Number of operations to perform", true);
-
@Override
public List<? extends Option> options()
{
- return Arrays.asList(count, retries, ignoreErrors, consistencyLevel);
+ return Arrays.asList(count, retries, ignoreErrors, consistencyLevel, add, atOnce);
}
}
static class Uncertainty extends Options
{
-
final OptionSimple uncertainty = new OptionSimple("err<", "0\\.[0-9]+", "0.02", "Run until the standard error of the mean is below this fraction", false);
final OptionSimple minMeasurements = new OptionSimple("n>", "[0-9]+", "30", "Run at least this many iterations before accepting uncertainty convergence", false);
final OptionSimple maxMeasurements = new OptionSimple("n<", "[0-9]+", "200", "Run at most this many iterations before accepting uncertainty convergence", false);
-
@Override
public List<? extends Option> options()
{
- return Arrays.asList(uncertainty, minMeasurements, maxMeasurements, retries, ignoreErrors, consistencyLevel);
+ return Arrays.asList(uncertainty, minMeasurements, maxMeasurements, retries, ignoreErrors, consistencyLevel, add, atOnce);
}
}
@@ -120,9 +123,8 @@ public class SettingsCommand implements Serializable
switch (cmd.category)
{
case BASIC:
- return build(cmd, params);
case MULTI:
- return SettingsCommandMulti.build(cmd, params);
+ return build(cmd, params);
case MIXED:
return SettingsCommandMixed.build(params);
}
@@ -153,18 +155,6 @@ public class SettingsCommand implements Serializable
GroupedOptions.printOptions(System.out, type.toLowerCase(), new Uncertainty(), new Count());
}
- static Runnable helpPrinter(final String type)
- {
- return new Runnable()
- {
- @Override
- public void run()
- {
- printHelp(type);
- }
- };
- }
-
static Runnable helpPrinter(final Command type)
{
return new Runnable()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e2c6105/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandMixed.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandMixed.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandMixed.java
index bce786a..3f16425 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandMixed.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandMixed.java
@@ -26,11 +26,12 @@ import java.util.List;
import org.apache.cassandra.stress.generatedata.Distribution;
import org.apache.cassandra.stress.generatedata.DistributionFactory;
+
import org.apache.commons.math3.distribution.EnumeratedDistribution;
import org.apache.commons.math3.util.Pair;
// Settings unique to the mixed command type
-public class SettingsCommandMixed extends SettingsCommandMulti
+public class SettingsCommandMixed extends SettingsCommand
{
// Ratios for selecting commands - index for each Command, NaN indicates the command is not requested
@@ -41,21 +42,8 @@ public class SettingsCommandMixed extends SettingsCommandMulti
{
super(Command.MIXED, options.parent);
- OptionSimple[] ratiosIn = options.probabilities.ratios;
- List<Pair<Command, Double>> ratiosOut = new ArrayList<>();
- for (int i = 0 ; i < ratiosIn.length ; i++)
- {
- if (ratiosIn[i] != null && ratiosIn[i].present())
- {
- double d = Double.parseDouble(ratiosIn[i].value());
- if (d > 0)
- ratiosOut.add(new Pair<>(Command.values()[i], d));
- }
- }
-
- ratios = ratiosOut;
clustering = options.clustering.get();
-
+ ratios = options.probabilities.ratios();
if (ratios.size() == 0)
throw new IllegalArgumentException("Must specify at least one command with a non-zero ratio");
}
@@ -144,16 +132,30 @@ public class SettingsCommandMixed extends SettingsCommandMulti
{
return grouping;
}
+
+ List<Pair<Command, Double>> ratios()
+ {
+ List<? extends Option> ratiosIn = setByUser() ? optionsSetByUser() : defaultOptions();
+ List<Pair<Command, Double>> ratiosOut = new ArrayList<>();
+ for (Option opt : ratiosIn)
+ {
+ OptionSimple ratioIn = (OptionSimple) opt;
+ Command command = Command.get(ratioIn.displayPrefix.substring(0, ratioIn.displayPrefix.length() - 1));
+ double d = Double.parseDouble(ratioIn.value());
+ ratiosOut.add(new Pair<>(command, d));
+ }
+ return ratiosOut;
+ }
}
static final class Options extends GroupedOptions
{
- final SettingsCommandMulti.Options parent;
- protected Options(SettingsCommandMulti.Options parent)
+ final SettingsCommand.Options parent;
+ protected Options(SettingsCommand.Options parent)
{
this.parent = parent;
}
- final OptionDistribution clustering = new OptionDistribution("clustering=", "GAUSSIAN(1..10)");
+ final OptionDistribution clustering = new OptionDistribution("clustering=", "GAUSSIAN(1..10)", "Distribution clustering runs of operations of the same kind");
final Probabilities probabilities = new Probabilities();
@Override
@@ -173,8 +175,8 @@ public class SettingsCommandMixed extends SettingsCommandMulti
public static SettingsCommandMixed build(String[] params)
{
GroupedOptions options = GroupedOptions.select(params,
- new Options(new SettingsCommandMulti.Options(new Uncertainty())),
- new Options(new SettingsCommandMulti.Options(new Count())));
+ new Options(new SettingsCommand.Uncertainty()),
+ new Options(new SettingsCommand.Count()));
if (options == null)
{
printHelp();
@@ -187,8 +189,8 @@ public class SettingsCommandMixed extends SettingsCommandMulti
public static void printHelp()
{
GroupedOptions.printOptions(System.out, "mixed",
- new Options(new SettingsCommandMulti.Options(new Uncertainty())),
- new Options(new SettingsCommandMulti.Options(new Count())));
+ new Options(new SettingsCommand.Uncertainty()),
+ new Options(new SettingsCommand.Count()));
}
public static Runnable helpPrinter()