You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/07/07 19:34:28 UTC
[05/23] Introduce CQL support for stress tool
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
deleted file mode 100644
index 021c4e8..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.KeyRange;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SliceRange;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public final class ThriftRangeSlicer extends Operation
-{
-
- public ThriftRangeSlicer(State state, long index)
- {
- super(state, index);
- }
-
- @Override
- public void run(final ThriftClient client) throws IOException
- {
- final SlicePredicate predicate = new SlicePredicate()
- .setSlice_range(
- new SliceRange(
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- false,
- state.settings.columns.maxColumnsPerKey
- )
- );
-
- final ByteBuffer start = getKey();
- final KeyRange range =
- new KeyRange(state.settings.columns.maxColumnsPerKey)
- .setStart_key(start)
- .setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER)
- .setCount(state.settings.command.keysAtOnce);
-
- for (final ColumnParent parent : state.columnParents)
- {
- timeWithRetry(new RunOp()
- {
- private int count = 0;
- @Override
- public boolean run() throws Exception
- {
- return (count = client.get_range_slices(parent, predicate, range, state.settings.command.consistencyLevel).size()) != 0;
- }
-
- @Override
- public String key()
- {
- return new String(range.bufferForStart_key().array());
- }
-
- @Override
- public int keyCount()
- {
- return count;
- }
- });
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
deleted file mode 100644
index dccf469..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.thrift.ColumnOrSuperColumn;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SuperColumn;
-
-public final class ThriftReader extends Operation
-{
-
- public ThriftReader(State state, long index)
- {
- super(state, index);
- }
-
- public void run(final ThriftClient client) throws IOException
- {
- final SlicePredicate predicate = slicePredicate();
- final ByteBuffer key = getKey();
- final List<ByteBuffer> expect = state.rowGen.isDeterministic() ? generateColumnValues(key) : null;
- for (final ColumnParent parent : state.columnParents)
- {
- timeWithRetry(new RunOp()
- {
- @Override
- public boolean run() throws Exception
- {
- List<ColumnOrSuperColumn> row = client.get_slice(key, parent, predicate, state.settings.command.consistencyLevel);
- if (expect == null)
- return !row.isEmpty();
- if (row == null)
- return false;
- if (!state.settings.columns.useSuperColumns)
- {
- if (row.size() != expect.size())
- return false;
- for (int i = 0 ; i < row.size() ; i++)
- if (!row.get(i).getColumn().bufferForValue().equals(expect.get(i)))
- return false;
- }
- else
- {
- for (ColumnOrSuperColumn col : row)
- {
- SuperColumn superColumn = col.getSuper_column();
- if (superColumn.getColumns().size() != expect.size())
- return false;
- for (int i = 0 ; i < expect.size() ; i++)
- if (!superColumn.getColumns().get(i).bufferForValue().equals(expect.get(i)))
- return false;
- }
- }
- return true;
- }
-
- @Override
- public String key()
- {
- return new String(key.array());
- }
-
- @Override
- public int keyCount()
- {
- return 1;
- }
- });
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
new file mode 100644
index 0000000..f794e75
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
@@ -0,0 +1,84 @@
+package org.apache.cassandra.stress.operations.predefined;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.cassandra.stress.generate.DistributionFactory;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.Timer;
+
+public class CqlCounterAdder extends CqlOperation<Integer>
+{
+
+ final Distribution counteradd;
+ public CqlCounterAdder(DistributionFactory counteradd, Timer timer, PartitionGenerator generator, StressSettings settings)
+ {
+ super(Command.COUNTER_WRITE, timer, generator, settings);
+ this.counteradd = counteradd.get();
+ }
+
+ @Override
+ protected String buildQuery()
+ {
+ String counterCF = isCql2() ? type.table : "Counter3";
+
+ StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired(counterCF));
+
+ if (isCql2())
+ query.append(" USING CONSISTENCY ").append(settings.command.consistencyLevel);
+
+ query.append(" SET ");
+
+ // TODO : increment distribution subset of columns
+ for (int i = 0; i < settings.columns.maxColumnsPerKey; i++)
+ {
+ if (i > 0)
+ query.append(",");
+
+ query.append('C').append(i).append("=C").append(i).append("+?");
+ }
+ query.append(" WHERE KEY=?");
+ return query.toString();
+ }
+
+ @Override
+ protected List<Object> getQueryParameters(byte[] key)
+ {
+ final List<Object> list = new ArrayList<>();
+ for (int i = 0; i < settings.columns.maxColumnsPerKey; i++)
+ list.add(counteradd.next());
+ list.add(ByteBuffer.wrap(key));
+ return list;
+ }
+
+ @Override
+ protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key)
+ {
+ return new CqlRunOpAlwaysSucceed(client, query, queryId, params, key, 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
new file mode 100644
index 0000000..94c8faf
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
@@ -0,0 +1,74 @@
+package org.apache.cassandra.stress.operations.predefined;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.Timer;
+
+public class CqlCounterGetter extends CqlOperation<Integer>
+{
+
+ public CqlCounterGetter(Timer timer, PartitionGenerator generator, StressSettings settings)
+ {
+ super(Command.COUNTER_READ, timer, generator, settings);
+ }
+
+ @Override
+ protected List<Object> getQueryParameters(byte[] key)
+ {
+ return Collections.<Object>singletonList(ByteBuffer.wrap(key));
+ }
+
+ @Override
+ protected String buildQuery()
+ {
+ StringBuilder query = new StringBuilder("SELECT ");
+
+ // TODO: obey slice/noslice option (instead of always slicing)
+ if (isCql2())
+ query.append("FIRST ").append(settings.columns.maxColumnsPerKey).append(" ''..''");
+ else
+ query.append("*");
+
+ String counterCF = isCql2() ? type.table : "Counter3";
+
+ query.append(" FROM ").append(wrapInQuotesIfRequired(counterCF));
+
+ if (isCql2())
+ query.append(" USING CONSISTENCY ").append(settings.command.consistencyLevel);
+
+ return query.append(" WHERE KEY=?").toString();
+ }
+
+ @Override
+ protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key)
+ {
+ return new CqlRunOpTestNonEmpty(client, query, queryId, params, key);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
new file mode 100644
index 0000000..c422f2b
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
@@ -0,0 +1,79 @@
+package org.apache.cassandra.stress.operations.predefined;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class CqlInserter extends CqlOperation<Integer>
+{
+
+ public CqlInserter(Timer timer, PartitionGenerator generator, StressSettings settings)
+ {
+ super(Command.WRITE, timer, generator, settings);
+ }
+
+ @Override
+ protected String buildQuery()
+ {
+ StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired(type.table));
+
+ if (isCql2())
+ query.append(" USING CONSISTENCY ").append(settings.command.consistencyLevel);
+
+ query.append(" SET ");
+
+ for (int i = 0 ; i < settings.columns.maxColumnsPerKey ; i++)
+ {
+ if (i > 0)
+ query.append(',');
+
+ query.append(wrapInQuotesIfRequired(settings.columns.namestrs.get(i))).append(" = ?");
+ }
+
+ query.append(" WHERE KEY=?");
+ return query.toString();
+ }
+
+ @Override
+ protected List<Object> getQueryParameters(byte[] key)
+ {
+ final ArrayList<Object> queryParams = new ArrayList<>();
+ List<ByteBuffer> values = getColumnValues();
+ queryParams.addAll(values);
+ queryParams.add(ByteBuffer.wrap(key));
+ return queryParams;
+ }
+
+ @Override
+ protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key)
+ {
+ return new CqlRunOpAlwaysSucceed(client, query, queryId, params, key, 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
new file mode 100644
index 0000000..0264cd1
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.stress.operations.predefined;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Function;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.StressMetrics;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.ConnectionStyle;
+import org.apache.cassandra.stress.settings.CqlVersion;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.CqlRow;
+import org.apache.cassandra.thrift.ThriftConversion;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.thrift.TException;
+
+public abstract class CqlOperation<V> extends PredefinedOperation
+{
+
+ protected abstract List<Object> getQueryParameters(byte[] key);
+ protected abstract String buildQuery();
+ protected abstract CqlRunOp<V> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key);
+
+ public CqlOperation(Command type, Timer timer, PartitionGenerator generator, StressSettings settings)
+ {
+ super(type, timer, generator, settings);
+ if (settings.columns.variableColumnCount)
+ throw new IllegalStateException("Variable column counts are not implemented for CQL");
+ }
+
+ protected CqlRunOp<V> run(final ClientWrapper client, final List<Object> queryParams, final ByteBuffer key) throws IOException
+ {
+ final CqlRunOp<V> op;
+ if (settings.mode.style == ConnectionStyle.CQL_PREPARED)
+ {
+ final Object id;
+ Object idobj = getCqlCache();
+ if (idobj == null)
+ {
+ try
+ {
+ id = client.createPreparedStatement(buildQuery());
+ } catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ storeCqlCache(id);
+ }
+ else
+ id = idobj;
+
+ op = buildRunOp(client, null, id, queryParams, key);
+ }
+ else
+ {
+ final String query;
+ Object qobj = getCqlCache();
+ if (qobj == null)
+ storeCqlCache(query = buildQuery());
+ else
+ query = qobj.toString();
+
+ op = buildRunOp(client, query, null, queryParams, key);
+ }
+
+ timeWithRetry(op);
+ return op;
+ }
+
+ protected void run(final ClientWrapper client) throws IOException
+ {
+ final byte[] key = getKey().array();
+ final List<Object> queryParams = getQueryParameters(key);
+ run(client, queryParams, ByteBuffer.wrap(key));
+ }
+
+ // Classes to process Cql results
+
+ // Always succeeds so long as the query executes without error; provides a keyCount to increment on instantiation
+ protected final class CqlRunOpAlwaysSucceed extends CqlRunOp<Integer>
+ {
+
+ final int keyCount;
+
+ protected CqlRunOpAlwaysSucceed(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key, int keyCount)
+ {
+ super(client, query, queryId, RowCountHandler.INSTANCE, params, key);
+ this.keyCount = keyCount;
+ }
+
+ @Override
+ public boolean validate(Integer result)
+ {
+ return true;
+ }
+
+ @Override
+ public int partitionCount()
+ {
+ return keyCount;
+ }
+
+ @Override
+ public int rowCount()
+ {
+ return keyCount;
+ }
+ }
+
+ // Succeeds so long as the result set is nonempty, and the query executes without error
+ protected final class CqlRunOpTestNonEmpty extends CqlRunOp<Integer>
+ {
+
+ protected CqlRunOpTestNonEmpty(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key)
+ {
+ super(client, query, queryId, RowCountHandler.INSTANCE, params, key);
+ }
+
+ @Override
+ public boolean validate(Integer result)
+ {
+ return result > 0;
+ }
+
+ @Override
+ public int partitionCount()
+ {
+ return result;
+ }
+
+ @Override
+ public int rowCount()
+ {
+ return result;
+ }
+ }
+
+ // Requires a custom validate() method, but fetches and stores the keys from the result set for further processing
+ protected abstract class CqlRunOpFetchKeys extends CqlRunOp<byte[][]>
+ {
+
+ protected CqlRunOpFetchKeys(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key)
+ {
+ super(client, query, queryId, KeysHandler.INSTANCE, params, key);
+ }
+
+ @Override
+ public int partitionCount()
+ {
+ return result.length;
+ }
+
+ @Override
+ public int rowCount()
+ {
+ return result.length;
+ }
+ }
+
+ protected final class CqlRunOpMatchResults extends CqlRunOp<ByteBuffer[][]>
+ {
+
+ final List<List<ByteBuffer>> expect;
+
+ // a null value for an item in expect means we just check the row is present
+ protected CqlRunOpMatchResults(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key, List<List<ByteBuffer>> expect)
+ {
+ super(client, query, queryId, RowsHandler.INSTANCE, params, key);
+ this.expect = expect;
+ }
+
+ @Override
+ public int partitionCount()
+ {
+ return result == null ? 0 : result.length;
+ }
+
+ @Override
+ public int rowCount()
+ {
+ return result == null ? 0 : result.length;
+ }
+
+ public boolean validate(ByteBuffer[][] result)
+ {
+ if (result.length != expect.size())
+ return false;
+ for (int i = 0 ; i < result.length ; i++)
+ if (expect.get(i) != null && !expect.get(i).equals(Arrays.asList(result[i])))
+ return false;
+ return true;
+ }
+ }
+
+ // Cql
+ protected abstract class CqlRunOp<V> implements RunOp
+ {
+
+ final ClientWrapper client;
+ final String query;
+ final Object queryId;
+ final List<Object> params;
+ final ByteBuffer key;
+ final ResultHandler<V> handler;
+ V result;
+
+ private CqlRunOp(ClientWrapper client, String query, Object queryId, ResultHandler<V> handler, List<Object> params, ByteBuffer key)
+ {
+ this.client = client;
+ this.query = query;
+ this.queryId = queryId;
+ this.handler = handler;
+ this.params = params;
+ this.key = key;
+ }
+
+ @Override
+ public boolean run() throws Exception
+ {
+ return queryId != null
+ ? validate(result = client.execute(queryId, key, params, handler))
+ : validate(result = client.execute(query, key, params, handler));
+ }
+
+ public abstract boolean validate(V result);
+
+ }
+
+
+ /// LOTS OF WRAPPING/UNWRAPPING NONSENSE
+
+
+ @Override
+ public void run(final ThriftClient client) throws IOException
+ {
+ run(wrap(client));
+ }
+
+ @Override
+ public void run(SimpleClient client) throws IOException
+ {
+ run(wrap(client));
+ }
+
+ @Override
+ public void run(JavaDriverClient client) throws IOException
+ {
+ run(wrap(client));
+ }
+
+ public ClientWrapper wrap(ThriftClient client)
+ {
+ return isCql3()
+ ? new Cql3CassandraClientWrapper(client)
+ : new Cql2CassandraClientWrapper(client);
+
+ }
+
+ public ClientWrapper wrap(JavaDriverClient client)
+ {
+ return new JavaDriverWrapper(client);
+ }
+
+ public ClientWrapper wrap(SimpleClient client)
+ {
+ return new SimpleClientWrapper(client);
+ }
+
+ protected interface ClientWrapper
+ {
+ Object createPreparedStatement(String cqlQuery) throws TException;
+ <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException;
+ <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException;
+ }
+
+ private final class JavaDriverWrapper implements ClientWrapper
+ {
+ final JavaDriverClient client;
+ private JavaDriverWrapper(JavaDriverClient client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
+ {
+ String formattedQuery = formatCqlQuery(query, queryParams, isCql3());
+ return handler.javaDriverHandler().apply(client.execute(formattedQuery, ThriftConversion.fromThrift(settings.command.consistencyLevel)));
+ }
+
+ @Override
+ public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
+ {
+ return handler.javaDriverHandler().apply(
+ client.executePrepared(
+ (PreparedStatement) preparedStatementId,
+ queryParams,
+ ThriftConversion.fromThrift(settings.command.consistencyLevel)));
+ }
+
+ @Override
+ public Object createPreparedStatement(String cqlQuery)
+ {
+ return client.prepare(cqlQuery);
+ }
+ }
+
+ private final class SimpleClientWrapper implements ClientWrapper
+ {
+ final SimpleClient client;
+ private SimpleClientWrapper(SimpleClient client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
+ {
+ String formattedQuery = formatCqlQuery(query, queryParams, isCql3());
+ return handler.thriftHandler().apply(client.execute(formattedQuery, ThriftConversion.fromThrift(settings.command.consistencyLevel)));
+ }
+
+ @Override
+ public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
+ {
+ return handler.thriftHandler().apply(
+ client.executePrepared(
+ (byte[]) preparedStatementId,
+ toByteBufferParams(queryParams),
+ ThriftConversion.fromThrift(settings.command.consistencyLevel)));
+ }
+
+ @Override
+ public Object createPreparedStatement(String cqlQuery)
+ {
+ return client.prepare(cqlQuery).statementId.bytes;
+ }
+ }
+
+ // client wrapper for Cql3
+ private final class Cql3CassandraClientWrapper implements ClientWrapper
+ {
+ final ThriftClient client;
+ private Cql3CassandraClientWrapper(ThriftClient client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
+ {
+ String formattedQuery = formatCqlQuery(query, queryParams, true);
+ return handler.simpleNativeHandler().apply(
+ client.execute_cql3_query(formattedQuery, key, Compression.NONE, settings.command.consistencyLevel)
+ );
+ }
+
+ @Override
+ public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
+ {
+ Integer id = (Integer) preparedStatementId;
+ return handler.simpleNativeHandler().apply(
+ client.execute_prepared_cql3_query(id, key, toByteBufferParams(queryParams), settings.command.consistencyLevel)
+ );
+ }
+
+ @Override
+ public Object createPreparedStatement(String cqlQuery) throws TException
+ {
+ return client.prepare_cql3_query(cqlQuery, Compression.NONE);
+ }
+ }
+
+ // client wrapper for Cql2
+ private final class Cql2CassandraClientWrapper implements ClientWrapper
+ {
+ final ThriftClient client;
+ private Cql2CassandraClientWrapper(ThriftClient client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
+ {
+ String formattedQuery = formatCqlQuery(query, queryParams, false);
+ return handler.simpleNativeHandler().apply(
+ client.execute_cql_query(formattedQuery, key, Compression.NONE)
+ );
+ }
+
+ @Override
+ public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
+ {
+ Integer id = (Integer) preparedStatementId;
+ return handler.simpleNativeHandler().apply(
+ client.execute_prepared_cql_query(id, key, toByteBufferParams(queryParams))
+ );
+ }
+
+ @Override
+ public Object createPreparedStatement(String cqlQuery) throws TException
+ {
+ return client.prepare_cql_query(cqlQuery, Compression.NONE);
+ }
+ }
+
+ // interface for building functions to standardise results from each client
+ protected static interface ResultHandler<V>
+ {
+ Function<ResultSet, V> javaDriverHandler();
+ Function<ResultMessage, V> thriftHandler();
+ Function<CqlResult, V> simpleNativeHandler();
+ }
+
+ protected static class RowCountHandler implements ResultHandler<Integer>
+ {
+ static final RowCountHandler INSTANCE = new RowCountHandler();
+
+ @Override
+ public Function<ResultSet, Integer> javaDriverHandler()
+ {
+ return new Function<ResultSet, Integer>()
+ {
+ @Override
+ public Integer apply(ResultSet rows)
+ {
+ if (rows == null)
+ return 0;
+ return rows.all().size();
+ }
+ };
+ }
+
+ @Override
+ public Function<ResultMessage, Integer> thriftHandler()
+ {
+ return new Function<ResultMessage, Integer>()
+ {
+ @Override
+ public Integer apply(ResultMessage result)
+ {
+ return result instanceof ResultMessage.Rows ? ((ResultMessage.Rows) result).result.size() : 0;
+ }
+ };
+ }
+
+ @Override
+ public Function<CqlResult, Integer> simpleNativeHandler()
+ {
+ return new Function<CqlResult, Integer>()
+ {
+
+ @Override
+ public Integer apply(CqlResult result)
+ {
+ switch (result.getType())
+ {
+ case ROWS:
+ return result.getRows().size();
+ default:
+ return 1;
+ }
+ }
+ };
+ }
+
+ }
+
+ // Processes results from each client into an array of all key bytes returned
+ protected static final class RowsHandler implements ResultHandler<ByteBuffer[][]>
+ {
+ static final RowsHandler INSTANCE = new RowsHandler();
+
+ @Override
+ public Function<ResultSet, ByteBuffer[][]> javaDriverHandler()
+ {
+ return new Function<ResultSet, ByteBuffer[][]>()
+ {
+
+ @Override
+ public ByteBuffer[][] apply(ResultSet result)
+ {
+ if (result == null)
+ return new ByteBuffer[0][];
+ List<Row> rows = result.all();
+
+ ByteBuffer[][] r = new ByteBuffer[rows.size()][];
+ for (int i = 0 ; i < r.length ; i++)
+ {
+ Row row = rows.get(i);
+ r[i] = new ByteBuffer[row.getColumnDefinitions().size()];
+ for (int j = 0 ; j < row.getColumnDefinitions().size() ; j++)
+ r[i][j] = row.getBytes(j);
+ }
+ return r;
+ }
+ };
+ }
+
+ @Override
+ public Function<ResultMessage, ByteBuffer[][]> thriftHandler()
+ {
+ return new Function<ResultMessage, ByteBuffer[][]>()
+ {
+
+ @Override
+ public ByteBuffer[][] apply(ResultMessage result)
+ {
+ if (!(result instanceof ResultMessage.Rows))
+ return new ByteBuffer[0][];
+
+ ResultMessage.Rows rows = ((ResultMessage.Rows) result);
+ ByteBuffer[][] r = new ByteBuffer[rows.result.size()][];
+ for (int i = 0 ; i < r.length ; i++)
+ {
+ List<ByteBuffer> row = rows.result.rows.get(i);
+ r[i] = new ByteBuffer[row.size()];
+ for (int j = 0 ; j < row.size() ; j++)
+ r[i][j] = row.get(j);
+ }
+ return r;
+ }
+ };
+ }
+
+ @Override
+ public Function<CqlResult, ByteBuffer[][]> simpleNativeHandler()
+ {
+ return new Function<CqlResult, ByteBuffer[][]>()
+ {
+
+ @Override
+ public ByteBuffer[][] apply(CqlResult result)
+ {
+ ByteBuffer[][] r = new ByteBuffer[result.getRows().size()][];
+ for (int i = 0 ; i < r.length ; i++)
+ {
+ CqlRow row = result.getRows().get(i);
+ r[i] = new ByteBuffer[row.getColumns().size()];
+ for (int j = 0 ; j < r[i].length ; j++)
+ r[i][j] = ByteBuffer.wrap(row.getColumns().get(j).getValue());
+ }
+ return r;
+ }
+ };
+ }
+
+ }
+ // Processes results from each client into an array of all key bytes returned
+ protected static final class KeysHandler implements ResultHandler<byte[][]>
+ {
+ static final KeysHandler INSTANCE = new KeysHandler();
+
+ @Override
+ public Function<ResultSet, byte[][]> javaDriverHandler()
+ {
+ return new Function<ResultSet, byte[][]>()
+ {
+
+ @Override
+ public byte[][] apply(ResultSet result)
+ {
+
+ if (result == null)
+ return new byte[0][];
+ List<Row> rows = result.all();
+ byte[][] r = new byte[rows.size()][];
+ for (int i = 0 ; i < r.length ; i++)
+ r[i] = rows.get(i).getBytes(0).array();
+ return r;
+ }
+ };
+ }
+
+ @Override
+ public Function<ResultMessage, byte[][]> thriftHandler()
+ {
+ return new Function<ResultMessage, byte[][]>()
+ {
+
+ @Override
+ public byte[][] apply(ResultMessage result)
+ {
+ if (result instanceof ResultMessage.Rows)
+ {
+ ResultMessage.Rows rows = ((ResultMessage.Rows) result);
+ byte[][] r = new byte[rows.result.size()][];
+ for (int i = 0 ; i < r.length ; i++)
+ r[i] = rows.result.rows.get(i).get(0).array();
+ return r;
+ }
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public Function<CqlResult, byte[][]> simpleNativeHandler()
+ {
+ return new Function<CqlResult, byte[][]>()
+ {
+
+ @Override
+ public byte[][] apply(CqlResult result)
+ {
+ byte[][] r = new byte[result.getRows().size()][];
+ for (int i = 0 ; i < r.length ; i++)
+ r[i] = result.getRows().get(i).getKey();
+ return r;
+ }
+ };
+ }
+
+ }
+
+ private static String getUnQuotedCqlBlob(ByteBuffer term, boolean isCQL3)
+ {
+ return isCQL3
+ ? "0x" + ByteBufferUtil.bytesToHex(term)
+ : ByteBufferUtil.bytesToHex(term);
+ }
+
+ /**
+ * Constructs a CQL query string by replacing instances of the character
+ * '?', with the corresponding parameter.
+ *
+ * @param query base query string to format
+ * @param parms sequence of string query parameters
+ * @return formatted CQL query string
+ */
+ private static String formatCqlQuery(String query, List<Object> parms, boolean isCql3)
+ {
+ int marker, position = 0;
+ StringBuilder result = new StringBuilder();
+
+ if (-1 == (marker = query.indexOf('?')) || parms.size() == 0)
+ return query;
+
+ for (Object parm : parms)
+ {
+ result.append(query.substring(position, marker));
+ if (parm instanceof ByteBuffer)
+ result.append(getUnQuotedCqlBlob((ByteBuffer) parm, isCql3));
+ else if (parm instanceof Long)
+ result.append(parm.toString());
+ else throw new AssertionError();
+
+ position = marker + 1;
+ if (-1 == (marker = query.indexOf('?', position + 1)))
+ break;
+ }
+
+ if (position < query.length())
+ result.append(query.substring(position));
+
+ return result.toString();
+ }
+
+ private static List<ByteBuffer> toByteBufferParams(List<Object> params)
+ {
+ List<ByteBuffer> r = new ArrayList<>();
+ for (Object param : params)
+ {
+ if (param instanceof ByteBuffer)
+ r.add((ByteBuffer) param);
+ else if (param instanceof Long)
+ r.add(ByteBufferUtil.bytes((Long) param));
+ else throw new AssertionError();
+ }
+ return r;
+ }
+
+ protected String wrapInQuotesIfRequired(String string)
+ {
+ return settings.mode.cqlVersion == CqlVersion.CQL3
+ ? "\"" + string + "\""
+ : string;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
new file mode 100644
index 0000000..3a7f75a
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
@@ -0,0 +1,87 @@
+package org.apache.cassandra.stress.operations.predefined;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class CqlReader extends CqlOperation<ByteBuffer[][]>
+{
+
+ public CqlReader(Timer timer, PartitionGenerator generator, StressSettings settings)
+ {
+ super(Command.READ, timer, generator, settings);
+ }
+
+ @Override
+ protected String buildQuery()
+ {
+ StringBuilder query = new StringBuilder("SELECT ");
+
+ if (settings.columns.slice)
+ {
+ if (isCql2())
+ query.append("FIRST ").append(settings.columns.maxColumnsPerKey).append(" ''..''");
+ else
+ query.append("*");
+ }
+ else
+ {
+ for (int i = 0; i < settings.columns.maxColumnsPerKey ; i++)
+ {
+ if (i > 0)
+ query.append(",");
+ query.append(wrapInQuotesIfRequired(settings.columns.namestrs.get(i)));
+ }
+ }
+
+ query.append(" FROM ").append(wrapInQuotesIfRequired(type.table));
+
+ if (isCql2())
+ query.append(" USING CONSISTENCY ").append(settings.command.consistencyLevel);
+ query.append(" WHERE KEY=?");
+ return query.toString();
+ }
+
+ @Override
+ protected List<Object> getQueryParameters(byte[] key)
+ {
+ return Collections.<Object>singletonList(ByteBuffer.wrap(key));
+ }
+
+ @Override
+ protected CqlRunOp<ByteBuffer[][]> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key)
+ {
+ List<ByteBuffer> expectRow = getColumnValues();
+ return new CqlRunOpMatchResults(client, query, queryId, params, key, Arrays.asList(expectRow));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
new file mode 100644
index 0000000..7f6412b
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations.predefined;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.StressMetrics;
+import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.cassandra.stress.generate.DistributionFactory;
+import org.apache.cassandra.stress.generate.DistributionFixed;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.CqlVersion;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public abstract class PredefinedOperation extends Operation
+{
+ public final Command type;
+ private final Distribution columnCount;
+ private Object cqlCache;
+
+ public PredefinedOperation(Command type, Timer timer, PartitionGenerator generator, StressSettings settings)
+ {
+ super(timer, generator, settings, new DistributionFixed(1));
+ this.type = type;
+ this.columnCount = settings.columns.countDistribution.get();
+ }
+
+ public boolean isCql3()
+ {
+ return settings.mode.cqlVersion == CqlVersion.CQL3;
+ }
+ public boolean isCql2()
+ {
+ return settings.mode.cqlVersion == CqlVersion.CQL2;
+ }
+ public Object getCqlCache()
+ {
+ return cqlCache;
+ }
+ public void storeCqlCache(Object val)
+ {
+ cqlCache = val;
+ }
+
+ protected ByteBuffer getKey()
+ {
+ return (ByteBuffer) partitions.get(0).getPartitionKey(0);
+ }
+
+ final class ColumnSelection
+ {
+ final int[] indices;
+ final int lb, ub;
+ private ColumnSelection(int[] indices, int lb, int ub)
+ {
+ this.indices = indices;
+ this.lb = lb;
+ this.ub = ub;
+ }
+
+ public <V> List<V> select(List<V> in)
+ {
+ List<V> out = new ArrayList<>();
+ if (indices != null)
+ {
+ for (int i : indices)
+ out.add(in.get(i));
+ }
+ else
+ {
+ out.addAll(in.subList(lb, ub));
+ }
+ return out;
+ }
+
+ int count()
+ {
+ return indices != null ? indices.length : ub - lb;
+ }
+
+ SlicePredicate predicate()
+ {
+ final SlicePredicate predicate = new SlicePredicate();
+ if (indices == null)
+ {
+ predicate.setSlice_range(new SliceRange()
+ .setStart(settings.columns.names.get(lb))
+ .setFinish(new byte[] {})
+ .setReversed(false)
+ .setCount(count())
+ );
+ }
+ else
+ predicate.setColumn_names(select(settings.columns.names));
+ return predicate;
+
+ }
+ }
+
+ public String toString()
+ {
+ return type.toString();
+ }
+
+ ColumnSelection select()
+ {
+ if (settings.columns.slice)
+ {
+ int count = (int) columnCount.next();
+ int start;
+ if (count == settings.columns.maxColumnsPerKey)
+ start = 0;
+ else
+ start = 1 + ThreadLocalRandom.current().nextInt(settings.columns.maxColumnsPerKey - count);
+ return new ColumnSelection(null, start, start + count);
+ }
+
+ int count = (int) columnCount.next();
+ int totalCount = settings.columns.names.size();
+ if (count == settings.columns.names.size())
+ return new ColumnSelection(null, 0, count);
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ int[] indices = new int[count];
+ int c = 0, o = 0;
+ while (c < count && count + o < totalCount)
+ {
+ int leeway = totalCount - (count + o);
+ int spreadover = count - c;
+ o += Math.round(rnd.nextDouble() * (leeway / (double) spreadover));
+ indices[c] = o + c;
+ c++;
+ }
+ while (c < count)
+ {
+ indices[c] = o + c;
+ c++;
+ }
+ return new ColumnSelection(indices, 0, 0);
+ }
+
+ protected List<ByteBuffer> getColumnValues()
+ {
+ return getColumnValues(new ColumnSelection(null, 0, settings.columns.names.size()));
+ }
+
+ protected List<ByteBuffer> getColumnValues(ColumnSelection columns)
+ {
+ Row row = partitions.get(0).iterator(1).batch(1f).iterator().next();
+ ByteBuffer[] r = new ByteBuffer[columns.count()];
+ int c = 0;
+ if (columns.indices != null)
+ for (int i : columns.indices)
+ r[c++] = (ByteBuffer) row.get(i);
+ else
+ for (int i = columns.lb ; i < columns.ub ; i++)
+ r[c++] = (ByteBuffer) row.get(i);
+ return Arrays.asList(r);
+ }
+
+ public static Operation operation(Command type, Timer timer, PartitionGenerator generator, StressSettings settings, DistributionFactory counteradd)
+ {
+ switch (type)
+ {
+ case READ:
+ switch(settings.mode.style)
+ {
+ case THRIFT:
+ return new ThriftReader(timer, generator, settings);
+ case CQL:
+ case CQL_PREPARED:
+ return new CqlReader(timer, generator, settings);
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+
+ case COUNTER_READ:
+ switch(settings.mode.style)
+ {
+ case THRIFT:
+ return new ThriftCounterGetter(timer, generator, settings);
+ case CQL:
+ case CQL_PREPARED:
+ return new CqlCounterGetter(timer, generator, settings);
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ case WRITE:
+
+ switch(settings.mode.style)
+ {
+ case THRIFT:
+ return new ThriftInserter(timer, generator, settings);
+ case CQL:
+ case CQL_PREPARED:
+ return new CqlInserter(timer, generator, settings);
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ case COUNTER_WRITE:
+ switch(settings.mode.style)
+ {
+ case THRIFT:
+ return new ThriftCounterAdder(counteradd, timer, generator, settings);
+ case CQL:
+ case CQL_PREPARED:
+ return new CqlCounterAdder(counteradd, timer, generator, settings);
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ throw new UnsupportedOperationException();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
new file mode 100644
index 0000000..ee766c3
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations.predefined;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.cassandra.stress.generate.DistributionFactory;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.*;
+
+public class ThriftCounterAdder extends PredefinedOperation
+{
+
+ final Distribution counteradd;
+ public ThriftCounterAdder(DistributionFactory counteradd, Timer timer, PartitionGenerator generator, StressSettings settings)
+ {
+ super(Command.COUNTER_WRITE, timer, generator, settings);
+ this.counteradd = counteradd.get();
+ }
+
+ public void run(final ThriftClient client) throws IOException
+ {
+ List<CounterColumn> columns = new ArrayList<>();
+ for (ByteBuffer name : select().select(settings.columns.names))
+ columns.add(new CounterColumn(name, counteradd.next()));
+
+ List<Mutation> mutations = new ArrayList<>(columns.size());
+ for (CounterColumn c : columns)
+ {
+ ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setCounter_column(c);
+ mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
+ }
+ Map<String, List<Mutation>> row = Collections.singletonMap(type.table, mutations);
+
+ final ByteBuffer key = getKey();
+ final Map<ByteBuffer, Map<String, List<Mutation>>> record = Collections.singletonMap(key, row);
+
+ timeWithRetry(new RunOp()
+ {
+ @Override
+ public boolean run() throws Exception
+ {
+ client.batch_mutate(record, settings.command.consistencyLevel);
+ return true;
+ }
+
+ @Override
+ public int partitionCount()
+ {
+ return 1;
+ }
+
+ @Override
+ public int rowCount()
+ {
+ return 1;
+ }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
new file mode 100644
index 0000000..10c6aab
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations.predefined;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.SlicePredicate;
+
+public class ThriftCounterGetter extends PredefinedOperation
+{
+ public ThriftCounterGetter(Timer timer, PartitionGenerator generator, StressSettings settings)
+ {
+ super(Command.COUNTER_READ, timer, generator, settings);
+ }
+
+ public void run(final ThriftClient client) throws IOException
+ {
+ final SlicePredicate predicate = select().predicate();
+ final ByteBuffer key = getKey();
+ timeWithRetry(new RunOp()
+ {
+ @Override
+ public boolean run() throws Exception
+ {
+ List<?> r = client.get_slice(key, new ColumnParent(type.table), predicate, settings.command.consistencyLevel);
+ return r != null && r.size() > 0;
+ }
+
+ @Override
+ public int partitionCount()
+ {
+ return 1;
+ }
+
+ @Override
+ public int rowCount()
+ {
+ return 1;
+ }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
new file mode 100644
index 0000000..5c2acfe
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations.predefined;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+public final class ThriftInserter extends PredefinedOperation
+{
+
+ public ThriftInserter(Timer timer, PartitionGenerator generator, StressSettings settings)
+ {
+ super(Command.WRITE, timer, generator, settings);
+ }
+
+ public void run(final ThriftClient client) throws IOException
+ {
+ final ByteBuffer key = getKey();
+ final List<Column> columns = getColumns();
+
+ List<Mutation> mutations = new ArrayList<>(columns.size());
+ for (Column c : columns)
+ {
+ ColumnOrSuperColumn column = new ColumnOrSuperColumn().setColumn(c);
+ mutations.add(new Mutation().setColumn_or_supercolumn(column));
+ }
+ Map<String, List<Mutation>> row = Collections.singletonMap(type.table, mutations);
+
+ final Map<ByteBuffer, Map<String, List<Mutation>>> record = Collections.singletonMap(key, row);
+
+ timeWithRetry(new RunOp()
+ {
+ @Override
+ public boolean run() throws Exception
+ {
+ client.batch_mutate(record, settings.command.consistencyLevel);
+ return true;
+ }
+
+ @Override
+ public int partitionCount()
+ {
+ return 1;
+ }
+
+ @Override
+ public int rowCount()
+ {
+ return 1;
+ }
+ });
+ }
+
+ protected List<Column> getColumns()
+ {
+ final ColumnSelection selection = select();
+ final List<ByteBuffer> values = getColumnValues(selection);
+ final List<Column> columns = new ArrayList<>(values.size());
+ final List<ByteBuffer> names = select().select(settings.columns.names);
+ for (int i = 0 ; i < values.size() ; i++)
+ columns.add(new Column(names.get(i))
+ .setValue(values.get(i))
+ .setTimestamp(FBUtilities.timestampMicros()));
+ return columns;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
new file mode 100644
index 0000000..276d8c5
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations.predefined;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SuperColumn;
+
+public final class ThriftReader extends PredefinedOperation
+{
+
+ public ThriftReader(Timer timer, PartitionGenerator generator, StressSettings settings)
+ {
+ super(Command.READ, timer, generator, settings);
+ }
+
+ public void run(final ThriftClient client) throws IOException
+ {
+ final ColumnSelection select = select();
+ final ByteBuffer key = getKey();
+ final List<ByteBuffer> expect = getColumnValues(select);
+ timeWithRetry(new RunOp()
+ {
+ @Override
+ public boolean run() throws Exception
+ {
+ List<ColumnOrSuperColumn> row = client.get_slice(key, new ColumnParent(type.table), select.predicate(), settings.command.consistencyLevel);
+ if (expect == null)
+ return !row.isEmpty();
+ if (row == null)
+ return false;
+ if (row.size() != expect.size())
+ return false;
+ for (int i = 0 ; i < row.size() ; i++)
+ if (!row.get(i).getColumn().bufferForValue().equals(expect.get(i)))
+ return false;
+ return true;
+ }
+
+ @Override
+ public int partitionCount()
+ {
+ return 1;
+ }
+
+ @Override
+ public int rowCount()
+ {
+ return 1;
+ }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
new file mode 100644
index 0000000..7c5efac
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
@@ -0,0 +1,144 @@
+package org.apache.cassandra.stress.operations.userdefined;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.cassandra.stress.generate.Partition;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.RatioDistribution;
+import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.settings.ValidationType;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+
+public class SchemaInsert extends SchemaStatement
+{
+
+ private final BatchStatement.Type batchType;
+ private final RatioDistribution perVisit;
+ private final RatioDistribution perBatch;
+
+ public SchemaInsert(Timer timer, PartitionGenerator generator, StressSettings settings, Distribution partitionCount, RatioDistribution perVisit, RatioDistribution perBatch, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, BatchStatement.Type batchType)
+ {
+ super(timer, generator, settings, partitionCount, statement, thriftId, cl, ValidationType.NOT_FAIL);
+ this.batchType = batchType;
+ this.perVisit = perVisit;
+ this.perBatch = perBatch;
+ }
+
+ private class JavaDriverRun extends Runner
+ {
+ final JavaDriverClient client;
+
+ private JavaDriverRun(JavaDriverClient client)
+ {
+ this.client = client;
+ }
+
+ public boolean run() throws Exception
+ {
+ Partition.RowIterator[] iterators = new Partition.RowIterator[partitions.size()];
+ for (int i = 0 ; i < iterators.length ; i++)
+ iterators[i] = partitions.get(i).iterator(perVisit.next());
+ List<BoundStatement> stmts = new ArrayList<>();
+ partitionCount = partitions.size();
+
+ boolean done;
+ do
+ {
+ done = true;
+ stmts.clear();
+ for (Partition.RowIterator iterator : iterators)
+ {
+ if (iterator.done())
+ continue;
+
+ for (Row row : iterator.batch(perBatch.next()))
+ stmts.add(bindRow(row));
+
+ done &= iterator.done();
+ }
+
+ rowCount += stmts.size();
+
+ Statement stmt;
+ if (stmts.size() == 1)
+ {
+ stmt = stmts.get(0);
+ }
+ else
+ {
+ BatchStatement batch = new BatchStatement(batchType);
+ batch.setConsistencyLevel(JavaDriverClient.from(cl));
+ batch.addAll(stmts);
+ stmt = batch;
+ }
+ validate(client.getSession().execute(stmt));
+
+ } while (!done);
+
+ return true;
+ }
+ }
+
+ private class ThriftRun extends Runner
+ {
+ final ThriftClient client;
+
+ private ThriftRun(ThriftClient client)
+ {
+ this.client = client;
+ }
+
+ public boolean run() throws Exception
+ {
+ Partition.RowIterator[] iterators = new Partition.RowIterator[partitions.size()];
+ for (int i = 0 ; i < iterators.length ; i++)
+ iterators[i] = partitions.get(i).iterator(perVisit.next());
+ partitionCount = partitions.size();
+
+ boolean done;
+ do
+ {
+ done = true;
+ for (Partition.RowIterator iterator : iterators)
+ {
+ if (iterator.done())
+ continue;
+
+ for (Row row : iterator.batch(perBatch.next()))
+ {
+ validate(client.execute_prepared_cql3_query(thriftId, iterator.partition().getToken(), thriftRowArgs(row), settings.command.consistencyLevel));
+ rowCount += 1;
+ }
+
+ done &= iterator.done();
+ }
+ } while (!done);
+
+ return true;
+ }
+ }
+
+ @Override
+ public void run(JavaDriverClient client) throws IOException
+ {
+ timeWithRetry(new JavaDriverRun(client));
+ }
+
+ @Override
+ public void run(ThriftClient client) throws IOException
+ {
+ timeWithRetry(new ThriftRun(client));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
new file mode 100644
index 0000000..9cec39b
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
@@ -0,0 +1,86 @@
+package org.apache.cassandra.stress.operations.userdefined;
+
+import java.io.IOException;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.settings.OptionDistribution;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.settings.ValidationType;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.ThriftConversion;
+
+public class SchemaQuery extends SchemaStatement
+{
+
+ public SchemaQuery(Timer timer, PartitionGenerator generator, StressSettings settings, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, ValidationType validationType)
+ {
+ super(timer, generator, settings, OptionDistribution.get("fixed(1)").get(), statement, thriftId, cl, validationType);
+ }
+
+ int execute(JavaDriverClient client) throws Exception
+ {
+ return client.getSession().execute(bindRandom(partitions.get(0))).all().size();
+ }
+
+ int execute(ThriftClient client) throws Exception
+ {
+ return client.execute_prepared_cql3_query(thriftId, partitions.get(0).getToken(), thriftRandomArgs(partitions.get(0)), ThriftConversion.toThrift(cl)).getRowsSize();
+ }
+
+ private class JavaDriverRun extends Runner
+ {
+ final JavaDriverClient client;
+
+ private JavaDriverRun(JavaDriverClient client)
+ {
+ this.client = client;
+ }
+
+ public boolean run() throws Exception
+ {
+ ResultSet rs = client.getSession().execute(bindRandom(partitions.get(0)));
+ validate(rs);
+ rowCount = rs.all().size();
+ partitionCount = Math.min(1, rowCount);
+ return true;
+ }
+ }
+
+ private class ThriftRun extends Runner
+ {
+ final ThriftClient client;
+
+ private ThriftRun(ThriftClient client)
+ {
+ this.client = client;
+ }
+
+ public boolean run() throws Exception
+ {
+ CqlResult rs = client.execute_prepared_cql3_query(thriftId, partitions.get(0).getToken(), thriftRandomArgs(partitions.get(0)), ThriftConversion.toThrift(cl));
+ validate(rs);
+ rowCount = rs.getRowsSize();
+ partitionCount = Math.min(1, rowCount);
+ return true;
+ }
+ }
+
+ @Override
+ public void run(JavaDriverClient client) throws IOException
+ {
+ timeWithRetry(new JavaDriverRun(client));
+ }
+
+ @Override
+ public void run(ThriftClient client) throws IOException
+ {
+ timeWithRetry(new ThriftRun(client));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
new file mode 100644
index 0000000..aac40c5
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
@@ -0,0 +1,164 @@
+package org.apache.cassandra.stress.operations.userdefined;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.cassandra.stress.generate.Partition;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.settings.ValidationType;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.transport.SimpleClient;
+
+public abstract class SchemaStatement extends Operation
+{
+
+ final PartitionGenerator generator;
+ private final PreparedStatement statement;
+ final Integer thriftId;
+ final ConsistencyLevel cl;
+ final ValidationType validationType;
+ private final int[] argumentIndex;
+ private final Object[] bindBuffer;
+ private final Object[][] randomBuffer;
+ private final Random random = new Random();
+
+ public SchemaStatement(Timer timer, PartitionGenerator generator, StressSettings settings, Distribution partitionCount,
+ PreparedStatement statement, Integer thriftId, ConsistencyLevel cl, ValidationType validationType)
+ {
+ super(timer, generator, settings, partitionCount);
+ this.generator = generator;
+ this.statement = statement;
+ this.thriftId = thriftId;
+ this.cl = cl;
+ this.validationType = validationType;
+ argumentIndex = new int[statement.getVariables().size()];
+ bindBuffer = new Object[argumentIndex.length];
+ randomBuffer = new Object[argumentIndex.length][argumentIndex.length];
+ int i = 0;
+ for (ColumnDefinitions.Definition definition : statement.getVariables())
+ argumentIndex[i++] = generator.indexOf(definition.getName());
+ }
+
+ private int filLRandom(Partition partition)
+ {
+ int c = 0;
+ for (Row row : partition.iterator(randomBuffer.length).batch(1f))
+ {
+ Object[] randomRow = randomBuffer[c++];
+ for (int i = 0 ; i < argumentIndex.length ; i++)
+ randomRow[i] = row.get(argumentIndex[i]);
+ if (c >= randomBuffer.length)
+ break;
+ }
+ return c;
+ }
+
+ BoundStatement bindRandom(Partition partition)
+ {
+ int c = filLRandom(partition);
+ for (int i = 0 ; i < argumentIndex.length ; i++)
+ {
+ int argIndex = argumentIndex[i];
+ bindBuffer[i] = randomBuffer[argIndex < 0 ? 0 : random.nextInt(c)][i];
+ }
+ return statement.bind(bindBuffer);
+ }
+
+ BoundStatement bindRow(Row row)
+ {
+ for (int i = 0 ; i < argumentIndex.length ; i++)
+ bindBuffer[i] = row.get(argumentIndex[i]);
+ return statement.bind(bindBuffer);
+ }
+
+ List<ByteBuffer> thriftRowArgs(Row row)
+ {
+ List<ByteBuffer> args = new ArrayList<>();
+ for (int i : argumentIndex)
+ args.add(generator.convert(i, row.get(i)));
+ return args;
+ }
+
+ List<ByteBuffer> thriftRandomArgs(Partition partition)
+ {
+ List<ByteBuffer> args = new ArrayList<>();
+ int c = filLRandom(partition);
+ for (int i = 0 ; i < argumentIndex.length ; i++)
+ {
+ int argIndex = argumentIndex[i];
+ args.add(generator.convert(argIndex, randomBuffer[argIndex < 0 ? 0 : random.nextInt(c)][i]));
+ }
+ return args;
+ }
+
+ void validate(ResultSet rs)
+ {
+ switch (validationType)
+ {
+ case NOT_FAIL:
+ return;
+ case NON_ZERO:
+ if (rs.all().size() == 0)
+ throw new IllegalStateException("Expected non-zero results");
+ break;
+ default:
+ throw new IllegalStateException("Unsupported validation type");
+ }
+ }
+
+ void validate(CqlResult rs)
+ {
+ switch (validationType)
+ {
+ case NOT_FAIL:
+ return;
+ case NON_ZERO:
+ if (rs.getRowsSize() == 0)
+ throw new IllegalStateException("Expected non-zero results");
+ break;
+ default:
+ throw new IllegalStateException("Unsupported validation type");
+ }
+ }
+
+ @Override
+ public void run(SimpleClient client) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ abstract class Runner implements RunOp
+ {
+ int partitionCount;
+ int rowCount;
+
+ @Override
+ public int partitionCount()
+ {
+ return partitionCount;
+ }
+
+ @Override
+ public int rowCount()
+ {
+ return rowCount;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/Command.java b/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
index ac10014..7138cbb 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
@@ -41,19 +41,6 @@ public enum Command
"Interleaving of any basic commands, with configurable ratio and distribution - the cluster must first be populated by a write test",
CommandCategory.MIXED
),
- RANGE_SLICE(false, "Standard1", "Super1",
- "Range slice queries - the cluster must first be populated by a write test",
- CommandCategory.MULTI
- ),
- INDEXED_RANGE_SLICE(false, "Standard1", "Super1",
- "Range slice queries through a secondary index. The cluster must first be populated by a write test, with indexing enabled.",
- CommandCategory.BASIC
- ),
- READ_MULTI(false, "Standard1", "Super1",
- "multi_read",
- "Multiple concurrent reads fetching multiple rows at once. The cluster must first be populated by a write test.",
- CommandCategory.MULTI
- ),
COUNTER_WRITE(true, "Counter1", "SuperCounter1",
"counter_add",
"Multiple concurrent updates of counters.",
@@ -64,6 +51,10 @@ public enum Command
"Multiple concurrent reads of counters. The cluster must first be populated by a counterwrite test.",
CommandCategory.BASIC
),
+ USER(true, null, null,
+ "Interleaving of user provided queries, with configurable ratio and distribution",
+ CommandCategory.USER
+ ),
HELP(false, null, null, "-?", "Print help for a command or option", null),
PRINT(false, null, null, "Inspect the output of a distribution definition", null),
@@ -136,11 +127,12 @@ public enum Command
}
switch (category)
{
+ case USER:
+ return SettingsCommandUser.helpPrinter();
case BASIC:
- case MULTI:
- return SettingsCommand.helpPrinter(this);
+ return SettingsCommandPreDefined.helpPrinter(this);
case MIXED:
- return SettingsCommandMixed.helpPrinter();
+ return SettingsCommandPreDefinedMixed.helpPrinter();
}
throw new AssertionError();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/settings/CommandCategory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/CommandCategory.java b/tools/stress/src/org/apache/cassandra/stress/settings/CommandCategory.java
index 4372f59..e9dd946 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/CommandCategory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/CommandCategory.java
@@ -24,6 +24,6 @@ package org.apache.cassandra.stress.settings;
public enum CommandCategory
{
BASIC,
- MULTI,
- MIXED
+ MIXED,
+ USER
}