You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2013/12/24 03:08:43 UTC
[4/6] Improve Stress Tool patch by Benedict;
reviewed by Pavel Yaskevich for CASSANDRA-6199
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
new file mode 100644
index 0000000..1f734be
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Function;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.settings.ConnectionStyle;
+import org.apache.cassandra.stress.settings.CqlVersion;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.ThriftConversion;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.thrift.TException;
+
+public abstract class CqlOperation<V> extends Operation
+{
+
+ protected abstract List<ByteBuffer> getQueryParameters(byte[] key);
+ protected abstract String buildQuery();
+ protected abstract CqlRunOp<V> buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String id, ByteBuffer key);
+
+ public CqlOperation(State state, long idx)
+ {
+ super(state, idx);
+ if (state.settings.columns.useSuperColumns)
+ throw new IllegalStateException("Super columns are not implemented for CQL");
+ if (state.settings.columns.variableColumnCount)
+ throw new IllegalStateException("Variable column counts are not implemented for CQL");
+ }
+
+ protected CqlRunOp<V> run(final ClientWrapper client, final List<ByteBuffer> queryParams, final ByteBuffer key, final String keyid) throws IOException
+ {
+ final CqlRunOp<V> op;
+ if (state.settings.mode.style == ConnectionStyle.CQL_PREPARED)
+ {
+ final Object id;
+ Object idobj = state.getCqlCache();
+ if (idobj == null)
+ {
+ try
+ {
+ id = client.createPreparedStatement(buildQuery());
+ } catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ state.storeCqlCache(id);
+ }
+ else
+ id = idobj;
+
+ op = buildRunOp(client, null, id, queryParams, keyid, key);
+ }
+ else
+ {
+ final String query;
+ Object qobj = state.getCqlCache();
+ if (qobj == null)
+ state.storeCqlCache(query = buildQuery());
+ else
+ query = qobj.toString();
+
+ op = buildRunOp(client, query, null, queryParams, keyid, key);
+ }
+
+ timeWithRetry(op);
+ return op;
+ }
+
+ protected void run(final ClientWrapper client) throws IOException
+ {
+ final byte[] key = getKey().array();
+ final List<ByteBuffer> queryParams = getQueryParameters(key);
+ run(client, queryParams, ByteBuffer.wrap(key), new String(key));
+ }
+
+ // Classes to process Cql results
+
+ // Always succeeds so long as the query executes without error; provides a keyCount to increment on instantiation
+ protected final class CqlRunOpAlwaysSucceed extends CqlRunOp<Integer>
+ {
+
+ final int keyCount;
+
+ protected CqlRunOpAlwaysSucceed(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String id, ByteBuffer key, int keyCount)
+ {
+ super(client, query, queryId, RowCountHandler.INSTANCE, params, id, key);
+ this.keyCount = keyCount;
+ }
+
+ @Override
+ public boolean validate(Integer result)
+ {
+ return true;
+ }
+
+ @Override
+ public int keyCount()
+ {
+ return keyCount;
+ }
+ }
+
+ // Succeeds so long as the result set is nonempty, and the query executes without error
+ protected final class CqlRunOpTestNonEmpty extends CqlRunOp<Integer>
+ {
+
+ protected CqlRunOpTestNonEmpty(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String id, ByteBuffer key)
+ {
+ super(client, query, queryId, RowCountHandler.INSTANCE, params, id, key);
+ }
+
+ @Override
+ public boolean validate(Integer result)
+ {
+ return true;
+ }
+
+ @Override
+ public int keyCount()
+ {
+ return result;
+ }
+ }
+
+ // Requires a custom validate() method, but fetches and stores the keys from the result set for further processing
+ protected abstract class CqlRunOpFetchKeys extends CqlRunOp<byte[][]>
+ {
+
+ protected CqlRunOpFetchKeys(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String id, ByteBuffer key)
+ {
+ super(client, query, queryId, KeysHandler.INSTANCE, params, id, key);
+ }
+
+ @Override
+ public int keyCount()
+ {
+ return result.length;
+ }
+
+ }
+
+ // Cql
+ protected abstract class CqlRunOp<V> implements RunOp
+ {
+
+ final ClientWrapper client;
+ final String query;
+ final Object queryId;
+ final List<ByteBuffer> params;
+ final String id;
+ final ByteBuffer key;
+ final ResultHandler<V> handler;
+ V result;
+
+ private CqlRunOp(ClientWrapper client, String query, Object queryId, ResultHandler<V> handler, List<ByteBuffer> params, String id, ByteBuffer key)
+ {
+ this.client = client;
+ this.query = query;
+ this.queryId = queryId;
+ this.handler = handler;
+ this.params = params;
+ this.id = id;
+ this.key = key;
+ }
+
+ @Override
+ public boolean run() throws Exception
+ {
+ return queryId != null
+ ? validate(result = client.execute(queryId, key, params, handler))
+ : validate(result = client.execute(query, key, params, handler));
+ }
+
+ @Override
+ public String key()
+ {
+ return id;
+ }
+
+ public abstract boolean validate(V result);
+
+ }
+
+
+ /// LOTS OF WRAPPING/UNWRAPPING NONSENSE
+
+
+ @Override
+ public void run(final ThriftClient client) throws IOException
+ {
+ run(wrap(client));
+ }
+
+ @Override
+ public void run(SimpleClient client) throws IOException
+ {
+ run(wrap(client));
+ }
+
+ @Override
+ public void run(JavaDriverClient client) throws IOException
+ {
+ run(wrap(client));
+ }
+
+ public ClientWrapper wrap(ThriftClient client)
+ {
+ return state.isCql3()
+ ? new Cql3CassandraClientWrapper(client)
+ : new Cql2CassandraClientWrapper(client);
+
+ }
+
+ public ClientWrapper wrap(JavaDriverClient client)
+ {
+ return new JavaDriverWrapper(client);
+ }
+
+ public ClientWrapper wrap(SimpleClient client)
+ {
+ return new SimpleClientWrapper(client);
+ }
+
+ protected interface ClientWrapper
+ {
+ Object createPreparedStatement(String cqlQuery) throws TException;
+ <V> V execute(Object preparedStatementId, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException;
+ <V> V execute(String query, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException;
+ }
+
+ private final class JavaDriverWrapper implements ClientWrapper
+ {
+ final JavaDriverClient client;
+ private JavaDriverWrapper(JavaDriverClient client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public <V> V execute(String query, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler)
+ {
+ String formattedQuery = formatCqlQuery(query, queryParams, state.isCql3());
+ return handler.javaDriverHandler().apply(client.execute(formattedQuery, ThriftConversion.fromThrift(state.settings.command.consistencyLevel)));
+ }
+
+ @Override
+ public <V> V execute(Object preparedStatementId, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler)
+ {
+ return handler.javaDriverHandler().apply(
+ client.executePrepared(
+ (PreparedStatement) preparedStatementId,
+ queryParams,
+ ThriftConversion.fromThrift(state.settings.command.consistencyLevel)));
+ }
+
+ @Override
+ public Object createPreparedStatement(String cqlQuery)
+ {
+ return client.prepare(cqlQuery);
+ }
+ }
+
+ private final class SimpleClientWrapper implements ClientWrapper
+ {
+ final SimpleClient client;
+ private SimpleClientWrapper(SimpleClient client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public <V> V execute(String query, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler)
+ {
+ String formattedQuery = formatCqlQuery(query, queryParams, state.isCql3());
+ return handler.thriftHandler().apply(client.execute(formattedQuery, ThriftConversion.fromThrift(state.settings.command.consistencyLevel)));
+ }
+
+ @Override
+ public <V> V execute(Object preparedStatementId, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler)
+ {
+ return handler.thriftHandler().apply(
+ client.executePrepared(
+ (byte[]) preparedStatementId,
+ queryParams,
+ ThriftConversion.fromThrift(state.settings.command.consistencyLevel)));
+ }
+
+ @Override
+ public Object createPreparedStatement(String cqlQuery)
+ {
+ return client.prepare(cqlQuery).statementId.bytes;
+ }
+ }
+
+ // client wrapper for Cql3
+ private final class Cql3CassandraClientWrapper implements ClientWrapper
+ {
+ final ThriftClient client;
+ private Cql3CassandraClientWrapper(ThriftClient client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public <V> V execute(String query, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException
+ {
+ String formattedQuery = formatCqlQuery(query, queryParams, true);
+ return handler.simpleNativeHandler().apply(
+ client.execute_cql3_query(query, key, Compression.NONE, state.settings.command.consistencyLevel)
+ );
+ }
+
+ @Override
+ public <V> V execute(Object preparedStatementId, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException
+ {
+ Integer id = (Integer) preparedStatementId;
+ return handler.simpleNativeHandler().apply(
+ client.execute_prepared_cql3_query(id, key, queryParams, state.settings.command.consistencyLevel)
+ );
+ }
+
+ @Override
+ public Object createPreparedStatement(String cqlQuery) throws TException
+ {
+ return client.prepare_cql3_query(cqlQuery, Compression.NONE);
+ }
+ }
+
+ // client wrapper for Cql2
+ private final class Cql2CassandraClientWrapper implements ClientWrapper
+ {
+ final ThriftClient client;
+ private Cql2CassandraClientWrapper(ThriftClient client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public <V> V execute(String query, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException
+ {
+ String formattedQuery = formatCqlQuery(query, queryParams, false);
+ return handler.simpleNativeHandler().apply(
+ client.execute_cql_query(formattedQuery, key, Compression.NONE)
+ );
+ }
+
+ @Override
+ public <V> V execute(Object preparedStatementId, ByteBuffer key, List<ByteBuffer> queryParams, ResultHandler<V> handler) throws TException
+ {
+ Integer id = (Integer) preparedStatementId;
+ return handler.simpleNativeHandler().apply(
+ client.execute_prepared_cql_query(id, key, queryParams)
+ );
+ }
+
+ @Override
+ public Object createPreparedStatement(String cqlQuery) throws TException
+ {
+ return client.prepare_cql_query(cqlQuery, Compression.NONE);
+ }
+ }
+
+ // interface for building functions to standardise results from each client
+ protected static interface ResultHandler<V>
+ {
+ Function<ResultSet, V> javaDriverHandler();
+ Function<ResultMessage, V> thriftHandler();
+ Function<CqlResult, V> simpleNativeHandler();
+ }
+
+ protected static class RowCountHandler implements ResultHandler<Integer>
+ {
+ static final RowCountHandler INSTANCE = new RowCountHandler();
+
+ @Override
+ public Function<ResultSet, Integer> javaDriverHandler()
+ {
+ return new Function<ResultSet, Integer>()
+ {
+ @Override
+ public Integer apply(ResultSet rows)
+ {
+ if (rows == null)
+ return 0;
+ return rows.all().size();
+ }
+ };
+ }
+
+ @Override
+ public Function<ResultMessage, Integer> thriftHandler()
+ {
+ return new Function<ResultMessage, Integer>()
+ {
+ @Override
+ public Integer apply(ResultMessage result)
+ {
+ return result instanceof ResultMessage.Rows ? ((ResultMessage.Rows) result).result.size() : 0;
+ }
+ };
+ }
+
+ @Override
+ public Function<CqlResult, Integer> simpleNativeHandler()
+ {
+ return new Function<CqlResult, Integer>()
+ {
+
+ @Override
+ public Integer apply(CqlResult result)
+ {
+ switch (result.getType())
+ {
+ case ROWS:
+ return result.getRows().size();
+ default:
+ return 1;
+ }
+ }
+ };
+ }
+
+ }
+
+ // Processes results from each client into an array of all key bytes returned
+ protected static final class KeysHandler implements ResultHandler<byte[][]>
+ {
+ static final KeysHandler INSTANCE = new KeysHandler();
+
+ @Override
+ public Function<ResultSet, byte[][]> javaDriverHandler()
+ {
+ return new Function<ResultSet, byte[][]>()
+ {
+
+ @Override
+ public byte[][] apply(ResultSet result)
+ {
+
+ if (result == null)
+ return new byte[0][];
+ List<Row> rows = result.all();
+ byte[][] r = new byte[rows.size()][];
+ for (int i = 0 ; i < r.length ; i++)
+ r[i] = rows.get(i).getBytes(0).array();
+ return r;
+ }
+ };
+ }
+
+ @Override
+ public Function<ResultMessage, byte[][]> thriftHandler()
+ {
+ return new Function<ResultMessage, byte[][]>()
+ {
+
+ @Override
+ public byte[][] apply(ResultMessage result)
+ {
+ if (result instanceof ResultMessage.Rows)
+ {
+ ResultMessage.Rows rows = ((ResultMessage.Rows) result);
+ byte[][] r = new byte[rows.result.size()][];
+ for (int i = 0 ; i < r.length ; i++)
+ r[i] = rows.result.rows.get(i).get(0).array();
+ return r;
+ }
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public Function<CqlResult, byte[][]> simpleNativeHandler()
+ {
+ return new Function<CqlResult, byte[][]>()
+ {
+
+ @Override
+ public byte[][] apply(CqlResult result)
+ {
+ byte[][] r = new byte[result.getRows().size()][];
+ for (int i = 0 ; i < r.length ; i++)
+ r[i] = result.getRows().get(i).getKey();
+ return r;
+ }
+ };
+ }
+
+ }
+
+ private static String getUnQuotedCqlBlob(ByteBuffer term, boolean isCQL3)
+ {
+ return isCQL3
+ ? "0x" + ByteBufferUtil.bytesToHex(term)
+ : ByteBufferUtil.bytesToHex(term);
+ }
+
+ /**
+ * Constructs a CQL query string by replacing instances of the character
+ * '?', with the corresponding parameter.
+ *
+ * @param query base query string to format
+ * @param parms sequence of string query parameters
+ * @return formatted CQL query string
+ */
+ private static String formatCqlQuery(String query, List<ByteBuffer> parms, boolean isCql3)
+ {
+ int marker, position = 0;
+ StringBuilder result = new StringBuilder();
+
+ if (-1 == (marker = query.indexOf('?')) || parms.size() == 0)
+ return query;
+
+ for (ByteBuffer parm : parms)
+ {
+ result.append(query.substring(position, marker));
+ result.append(getUnQuotedCqlBlob(parm, isCql3));
+
+ position = marker + 1;
+ if (-1 == (marker = query.indexOf('?', position + 1)))
+ break;
+ }
+
+ if (position < query.length())
+ result.append(query.substring(position));
+
+ return result.toString();
+ }
+
+ protected String wrapInQuotesIfRequired(String string)
+ {
+ return state.settings.mode.cqlVersion == CqlVersion.CQL3
+ ? "\"" + string + "\""
+ : string;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
index c01767b..467e754 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
@@ -21,98 +21,39 @@ package org.apache.cassandra.stress.operations;
*/
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.transport.SimpleClient;
-
-public class CqlRangeSlicer extends CQLOperation
+public class CqlRangeSlicer extends CqlOperation<Integer>
{
- private static String cqlQuery = null;
- private int lastRowCount;
-
- public CqlRangeSlicer(Session client, int idx)
+ public CqlRangeSlicer(State state, long idx)
{
- super(client, idx);
+ super(state, idx);
}
- protected void run(CQLQueryExecutor executor) throws IOException
+ @Override
+ protected List<ByteBuffer> getQueryParameters(byte[] key)
{
- if (session.getColumnFamilyType() == ColumnFamilyType.Super)
- throw new RuntimeException("Super columns are not implemented for CQL");
-
- if (cqlQuery == null)
- {
- StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
- .append(" ''..'' FROM Standard1");
-
- if (session.cqlVersion.startsWith("2"))
- query.append(" USING CONSISTENCY ").append(session.getConsistencyLevel().toString());
-
- cqlQuery = query.append(" WHERE KEY > ?").toString();
- }
-
- String key = String.format("%0" + session.getTotalKeysLength() + "d", index);
- List<String> queryParams = Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
-
- TimerContext context = session.latency.time();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
+ return Collections.singletonList(ByteBuffer.wrap(key));
+ }
- try
- {
- success = executor.execute(cqlQuery, queryParams);
- }
- catch (Exception e)
- {
- System.err.println(e);
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
+ @Override
+ protected String buildQuery()
+ {
+ StringBuilder query = new StringBuilder("SELECT FIRST ").append(state.settings.columns.maxColumnsPerKey)
+ .append(" ''..'' FROM ").append(state.settings.schema.columnFamily);
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error executing range slice with offset %s %s%n",
- index,
- session.getRetryTimes(),
- key,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
- }
+ if (state.isCql2())
+ query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
- session.operations.getAndIncrement();
- session.keys.getAndAdd(lastRowCount);
- context.stop();
+ return query.append(" WHERE KEY > ?").toString();
}
- protected boolean validateThriftResult(CqlResult result)
+ @Override
+ protected CqlRunOp buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
{
- lastRowCount = result.rows.size();
- return lastRowCount != 0;
+ return new CqlRunOpTestNonEmpty(client, query, queryId, params, keyid, key);
}
- protected boolean validateNativeResult(ResultMessage result)
- {
- assert result instanceof ResultMessage.Rows;
- lastRowCount = ((ResultMessage.Rows)result).result.size();
- return lastRowCount != 0;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
index 70273c1..051fd18 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
@@ -21,116 +21,67 @@ package org.apache.cassandra.stress.operations;
*/
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.SimpleClient;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.thrift.ThriftConversion;
-
-public class CqlReader extends CQLOperation
+public class CqlReader extends CqlOperation<Integer>
{
- private static String cqlQuery = null;
- public CqlReader(Session client, int idx)
+ public CqlReader(State state, long idx)
{
- super(client, idx);
+ super(state, idx);
}
- protected void run(CQLQueryExecutor executor) throws IOException
+ @Override
+ protected String buildQuery()
{
- if (session.getColumnFamilyType() == ColumnFamilyType.Super)
- throw new RuntimeException("Super columns are not implemented for CQL");
+ StringBuilder query = new StringBuilder("SELECT ");
- if (cqlQuery == null)
+ if (state.settings.columns.names == null)
{
- StringBuilder query = new StringBuilder("SELECT ");
-
- if (session.columnNames == null)
- {
- if (session.cqlVersion.startsWith("2"))
- query.append("FIRST ").append(session.getColumnsPerKey()).append(" ''..''");
- else
- query.append("*");
- }
+ if (state.isCql2())
+ query.append("FIRST ").append(state.settings.columns.maxColumnsPerKey).append(" ''..''");
else
- {
- for (int i = 0; i < session.columnNames.size(); i++)
- {
- if (i > 0) query.append(",");
- query.append('?');
- }
- }
-
- query.append(" FROM ").append(wrapInQuotesIfRequired("Standard1"));
-
- if (session.cqlVersion.startsWith("2"))
- query.append(" USING CONSISTENCY ").append(session.getConsistencyLevel().toString());
- query.append(" WHERE KEY=?");
-
- cqlQuery = query.toString();
+ query.append("*");
}
-
- List<String> queryParams = new ArrayList<String>();
- if (session.columnNames != null)
- for (int i = 0; i < session.columnNames.size(); i++)
- queryParams.add(getUnQuotedCqlBlob(session.columnNames.get(i).array(), session.cqlVersion.startsWith("3")));
-
- byte[] key = generateKey();
- queryParams.add(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
-
- TimerContext context = session.latency.time();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
+ else
{
- if (success)
- break;
-
- try
- {
- success = executor.execute(cqlQuery, queryParams);
- }
- catch (Exception e)
+ for (int i = 0; i < state.settings.columns.names.size() ; i++)
{
- exceptionMessage = getExceptionMessage(e);
- success = false;
+ if (i > 0)
+ query.append(",");
+ query.append('?');
}
}
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error reading key %s %s%n with query %s",
- index,
- session.getRetryTimes(),
- new String(key),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")",
- cqlQuery));
- }
+ query.append(" FROM ").append(wrapInQuotesIfRequired(state.settings.schema.columnFamily));
- session.operations.getAndIncrement();
- session.keys.getAndIncrement();
- context.stop();
+ if (state.isCql2())
+ query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
+ query.append(" WHERE KEY=?");
+ return query.toString();
}
- protected boolean validateThriftResult(CqlResult result)
+ @Override
+ protected List<ByteBuffer> getQueryParameters(byte[] key)
{
- return result.rows.get(0).columns.size() != 0;
+ if (state.settings.columns.names != null)
+ {
+ final List<ByteBuffer> queryParams = new ArrayList<>();
+ for (ByteBuffer name : state.settings.columns.names)
+ queryParams.add(name);
+ queryParams.add(ByteBuffer.wrap(key));
+ return queryParams;
+ }
+ return Collections.singletonList(ByteBuffer.wrap(key));
}
- protected boolean validateNativeResult(ResultMessage result)
+ @Override
+ protected CqlRunOp buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
{
- return result instanceof ResultMessage.Rows && ((ResultMessage.Rows)result).result.size() != 0;
+ return new CqlRunOpTestNonEmpty(client, query, queryId, params, keyid, key);
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
deleted file mode 100644
index b7c72a2..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-public class IndexedRangeSlicer extends Operation
-{
- private static List<ByteBuffer> values = null;
-
- public IndexedRangeSlicer(Session client, int index)
- {
- super(client, index);
- }
-
- public void run(CassandraClient client) throws IOException
- {
- if (values == null)
- values = generateValues();
-
- String format = "%0" + session.getTotalKeysLength() + "d";
- SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- false, session.getColumnsPerKey()));
-
- ColumnParent parent = new ColumnParent("Standard1");
- int expectedPerValue = session.getNumKeys() / values.size();
-
- ByteBuffer columnName = ByteBufferUtil.bytes("C1");
-
- int received = 0;
-
- String startOffset = String.format(format, 0);
- ByteBuffer value = values.get(1); // only C1 column is indexed
-
- IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value);
-
- while (received < expectedPerValue)
- {
- IndexClause clause = new IndexClause(Arrays.asList(expression),
- ByteBufferUtil.bytes(startOffset),
- session.getKeysPerCall());
-
- List<KeySlice> results = null;
- TimerContext context = session.latency.time();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
-
- try
- {
- results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
- success = (results.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
-
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n",
- index,
- session.getRetryTimes(),
- startOffset,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
- }
-
- received += results.size();
-
- // convert max key found back to an integer, and increment it
- startOffset = String.format(format, (1 + getMaxKey(results)));
-
- session.operations.getAndIncrement();
- session.keys.getAndAdd(results.size());
- context.stop();
- }
- }
-
- /**
- * Get maximum key from keySlice list
- * @param keySlices list of the KeySlice objects
- * @return maximum key value of the list
- */
- private int getMaxKey(List<KeySlice> keySlices)
- {
- byte[] firstKey = keySlices.get(0).getKey();
- int maxKey = ByteBufferUtil.toInt(ByteBuffer.wrap(firstKey));
-
- for (KeySlice k : keySlices)
- {
- int currentKey = ByteBufferUtil.toInt(ByteBuffer.wrap(k.getKey()));
-
- if (currentKey > maxKey)
- {
- maxKey = currentKey;
- }
- }
-
- return maxKey;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
deleted file mode 100644
index cbf6b98..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-public class Inserter extends Operation
-{
- private static List<ByteBuffer> values;
-
- public Inserter(Session client, int index)
- {
- super(client, index);
- }
-
- public void run(CassandraClient client) throws IOException
- {
- if (values == null)
- values = generateValues();
-
- List<Column> columns = new ArrayList<Column>(session.getColumnsPerKey());
- List<SuperColumn> superColumns = null;
-
- // format used for keys
- String format = "%0" + session.getTotalKeysLength() + "d";
-
- for (int i = 0; i < session.getColumnsPerKey(); i++)
- {
- columns.add(new Column(columnName(i, session.timeUUIDComparator))
- .setValue(values.get(i % values.size()))
- .setTimestamp(FBUtilities.timestampMicros()));
- }
-
- if (session.getColumnFamilyType() == ColumnFamilyType.Super)
- {
- superColumns = new ArrayList<SuperColumn>();
- // supers = [SuperColumn('S' + str(j), columns) for j in xrange(supers_per_key)]
- for (int i = 0; i < session.getSuperColumns(); i++)
- {
- String superColumnName = "S" + Integer.toString(i);
- superColumns.add(new SuperColumn(ByteBufferUtil.bytes(superColumnName), columns));
- }
- }
-
- String rawKey = String.format(format, index);
- Map<String, List<Mutation>> row = session.getColumnFamilyType() == ColumnFamilyType.Super
- ? getSuperColumnsMutationMap(superColumns)
- : getColumnsMutationMap(columns);
- Map<ByteBuffer, Map<String, List<Mutation>>> record = Collections.singletonMap(ByteBufferUtil.bytes(rawKey), row);
-
- TimerContext context = session.latency.time();
-
- boolean success = false;
- String exceptionMessage = null;
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
-
- try
- {
- client.batch_mutate(record, session.getConsistencyLevel());
- success = true;
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
-
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error inserting key %s %s%n",
- index,
- session.getRetryTimes(),
- rawKey,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
- }
-
- session.operations.getAndIncrement();
- session.keys.getAndIncrement();
- context.stop();
- }
-
- private Map<String, List<Mutation>> getSuperColumnsMutationMap(List<SuperColumn> superColumns)
- {
- List<Mutation> mutations = new ArrayList<Mutation>(superColumns.size());
- for (SuperColumn s : superColumns)
- {
- ColumnOrSuperColumn superColumn = new ColumnOrSuperColumn().setSuper_column(s);
- mutations.add(new Mutation().setColumn_or_supercolumn(superColumn));
- }
-
- return Collections.singletonMap("Super1", mutations);
- }
-
- private Map<String, List<Mutation>> getColumnsMutationMap(List<Column> columns)
- {
- List<Mutation> mutations = new ArrayList<Mutation>(columns.size());
- for (Column c : columns)
- {
- ColumnOrSuperColumn column = new ColumnOrSuperColumn().setColumn(c);
- mutations.add(new Mutation().setColumn_or_supercolumn(column));
- }
-
- return Collections.singletonMap("Standard1", mutations);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
deleted file mode 100644
index 12a39fb..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-public class MultiGetter extends Operation
-{
- public MultiGetter(Session client, int index)
- {
- super(client, index);
- }
-
- public void run(CassandraClient client) throws IOException
- {
- SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- false, session.getColumnsPerKey()));
-
- int offset = index * session.getKeysPerThread();
- Map<ByteBuffer,List<ColumnOrSuperColumn>> results;
-
- if (session.getColumnFamilyType() == ColumnFamilyType.Super)
- {
- List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
-
- for (int j = 0; j < session.getSuperColumns(); j++)
- {
- ColumnParent parent = new ColumnParent("Super1").setSuper_column(ByteBufferUtil.bytes("S" + j));
-
- TimerContext context = session.latency.time();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
-
- try
- {
- results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
- success = (results.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- }
- }
-
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
- index,
- session.getRetryTimes(),
- keys,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
- }
-
- session.operations.getAndIncrement();
- session.keys.getAndAdd(keys.size());
- context.stop();
-
- offset += session.getKeysPerCall();
- }
- }
- else
- {
- ColumnParent parent = new ColumnParent("Standard1");
-
- List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
-
- TimerContext context = session.latency.time();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
-
- try
- {
- results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
- success = (results.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
-
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
- index,
- session.getRetryTimes(),
- keys,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
- }
-
- session.operations.getAndIncrement();
- session.keys.getAndAdd(keys.size());
- context.stop();
-
- offset += session.getKeysPerCall();
- }
- }
-
- private List<ByteBuffer> generateKeys(int start, int limit)
- {
- List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
-
- for (int i = start; i < limit; i++)
- {
- keys.add(ByteBuffer.wrap(generateKey()));
- }
-
- return keys;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
deleted file mode 100644
index f9ba115..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-public class RangeSlicer extends Operation
-{
-
- public RangeSlicer(Session client, int index)
- {
- super(client, index);
- }
-
- public void run(CassandraClient client) throws IOException
- {
- String format = "%0" + session.getTotalKeysLength() + "d";
-
- // initial values
- int count = session.getColumnsPerKey();
-
- SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- false,
- count));
-
- if (session.getColumnFamilyType() == ColumnFamilyType.Super)
- {
- ByteBuffer start = ByteBufferUtil.bytes(String.format(format, index));
-
- List<KeySlice> slices = new ArrayList<KeySlice>();
- KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
-
- for (int i = 0; i < session.getSuperColumns(); i++)
- {
- String superColumnName = "S" + Integer.toString(i);
- ColumnParent parent = new ColumnParent("Super1").setSuper_column(ByteBufferUtil.bytes(superColumnName));
-
- TimerContext context = session.latency.time();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- try
- {
- slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
- success = (slices.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
-
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error on calling get_range_slices for range offset %s %s%n",
- index,
- session.getRetryTimes(),
- ByteBufferUtil.string(start),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
- }
-
- session.operations.getAndIncrement();
- context.stop();
- }
-
- session.keys.getAndAdd(slices.size());
- }
- else
- {
- ColumnParent parent = new ColumnParent("Standard1");
-
- ByteBuffer start = ByteBufferUtil.bytes(String.format(format, index));
-
- List<KeySlice> slices = new ArrayList<KeySlice>();
- KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
-
- TimerContext context = session.latency.time();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
-
- try
- {
- slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
- success = (slices.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
-
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error on calling get_indexed_slices for range offset %s %s%n",
- index,
- session.getRetryTimes(),
- ByteBufferUtil.string(start),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
- }
-
- session.operations.getAndIncrement();
- session.keys.getAndAdd(slices.size());
- context.stop();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java b/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
deleted file mode 100644
index 72d09b4..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.thrift.*;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-public class Reader extends Operation
-{
- public Reader(Session client, int index)
- {
- super(client, index);
- }
-
- public void run(CassandraClient client) throws IOException
- {
- // initialize SlicePredicate with existing SliceRange
- SlicePredicate predicate = new SlicePredicate();
-
- if (session.columnNames == null)
- predicate.setSlice_range(getSliceRange());
- else // see CASSANDRA-3064 about why this is useful
- predicate.setColumn_names(session.columnNames);
-
- if (session.getColumnFamilyType() == ColumnFamilyType.Super)
- {
- runSuperColumnReader(predicate, client);
- }
- else
- {
- runColumnReader(predicate, client);
- }
- }
-
- private void runSuperColumnReader(SlicePredicate predicate, Cassandra.Client client) throws IOException
- {
- byte[] rawKey = generateKey();
- ByteBuffer key = ByteBuffer.wrap(rawKey);
-
- for (int j = 0; j < session.getSuperColumns(); j++)
- {
- String superColumn = 'S' + Integer.toString(j);
- ColumnParent parent = new ColumnParent("Super1").setSuper_column(superColumn.getBytes(UTF_8));
-
- TimerContext context = session.latency.time();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
-
- try
- {
- List<ColumnOrSuperColumn> columns;
- columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
- success = (columns.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
-
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error reading key %s %s%n",
- index,
- session.getRetryTimes(),
- new String(rawKey),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
- }
-
- session.operations.getAndIncrement();
- session.keys.getAndIncrement();
- context.stop();
- }
- }
-
- private void runColumnReader(SlicePredicate predicate, Cassandra.Client client) throws IOException
- {
- ColumnParent parent = new ColumnParent("Standard1");
-
- byte[] key = generateKey();
- ByteBuffer keyBuffer = ByteBuffer.wrap(key);
-
- TimerContext context = session.latency.time();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
-
- try
- {
- List<ColumnOrSuperColumn> columns;
- columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
- success = (columns.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
-
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error reading key %s %s%n",
- index,
- session.getRetryTimes(),
- new String(key),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
- }
-
- session.operations.getAndIncrement();
- session.keys.getAndIncrement();
- context.stop();
- }
-
- private SliceRange getSliceRange()
- {
- return new SliceRange()
- .setStart(new byte[] {})
- .setFinish(new byte[] {})
- .setReversed(false)
- .setCount(session.getColumnsPerKey());
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
new file mode 100644
index 0000000..b1657b2
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.*;
+
+public class ThriftCounterAdder extends Operation
+{
+ public ThriftCounterAdder(State state, long index)
+ {
+ super(state, index);
+ if (state.settings.columns.variableColumnCount)
+ throw new IllegalStateException("Variable column counts not supported for counters");
+ }
+
+ public void run(final ThriftClient client) throws IOException
+ {
+ List<CounterColumn> columns = new ArrayList<CounterColumn>();
+ for (int i = 0; i < state.settings.columns.maxColumnsPerKey; i++)
+ columns.add(new CounterColumn(getColumnNameBytes(i), 1L));
+
+ Map<String, List<Mutation>> row;
+ if (state.settings.columns.useSuperColumns)
+ {
+ List<Mutation> mutations = new ArrayList<>();
+ for (ColumnParent parent : state.columnParents)
+ {
+ CounterSuperColumn csc = new CounterSuperColumn(ByteBuffer.wrap(parent.getSuper_column()), columns);
+ ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setCounter_super_column(csc);
+ mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
+ }
+ row = Collections.singletonMap("SuperCounter1", mutations);
+ }
+ else
+ {
+ List<Mutation> mutations = new ArrayList<>(columns.size());
+ for (CounterColumn c : columns)
+ {
+ ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setCounter_column(c);
+ mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
+ }
+ row = Collections.singletonMap("Counter1", mutations);
+ }
+
+ final ByteBuffer key = getKey();
+ final Map<ByteBuffer, Map<String, List<Mutation>>> record = Collections.singletonMap(key, row);
+
+ timeWithRetry(new RunOp()
+ {
+ @Override
+ public boolean run() throws Exception
+ {
+ client.batch_mutate(record, state.settings.command.consistencyLevel);
+ return true;
+ }
+
+ @Override
+ public String key()
+ {
+ return new String(key.array());
+ }
+
+ @Override
+ public int keyCount()
+ {
+ return 1;
+ }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java
new file mode 100644
index 0000000..8567edd
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+
+public class ThriftCounterGetter extends Operation
+{
+ public ThriftCounterGetter(State state, long index)
+ {
+ super(state, index);
+ if (state.settings.columns.variableColumnCount)
+ throw new IllegalStateException("Variable column counts not supported for counters");
+ }
+
+ public void run(final ThriftClient client) throws IOException
+ {
+ SliceRange sliceRange = new SliceRange();
+ // start/finish
+ sliceRange.setStart(new byte[] {}).setFinish(new byte[] {});
+ // reversed/count
+ sliceRange.setReversed(false).setCount(state.settings.columns.maxColumnsPerKey);
+ // initialize SlicePredicate with existing SliceRange
+ final SlicePredicate predicate = new SlicePredicate().setSlice_range(sliceRange);
+
+ final ByteBuffer key = getKey();
+ for (final ColumnParent parent : state.columnParents)
+ {
+
+ timeWithRetry(new RunOp()
+ {
+ @Override
+ public boolean run() throws Exception
+ {
+ return client.get_slice(key, parent, predicate, state.settings.command.consistencyLevel).size() != 0;
+ }
+
+ @Override
+ public String key()
+ {
+ return new String(key.array());
+ }
+
+ @Override
+ public int keyCount()
+ {
+ return 1;
+ }
+ });
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
new file mode 100644
index 0000000..c6b1b03
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
@@ -0,0 +1,115 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.settings.SettingsCommandMulti;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ThriftIndexedRangeSlicer extends Operation
+{
+ public ThriftIndexedRangeSlicer(State state, long index)
+ {
+ super(state, index);
+ if (!state.rowGen.isDeterministic() || !state.keyGen.isDeterministic())
+ throw new IllegalStateException("Only run with a isDeterministic row/key generator");
+ if (state.settings.columns.useSuperColumns || state.columnParents.size() != 1)
+ throw new IllegalStateException("Does not support super columns");
+ if (state.settings.columns.useTimeUUIDComparator)
+ throw new IllegalStateException("Does not support TimeUUID column names");
+ }
+
+ public void run(final ThriftClient client) throws IOException
+ {
+
+ final SlicePredicate predicate = new SlicePredicate()
+ .setSlice_range(new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ false, state.settings.columns.maxColumnsPerKey));
+ final List<ByteBuffer> columns = generateColumnValues();
+ final ColumnParent parent = state.columnParents.get(0);
+
+ final ByteBuffer columnName = getColumnNameBytes(1);
+ final ByteBuffer value = columns.get(1); // only C1 column is indexed
+
+ IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value);
+ byte[] minKey = new byte[0];
+ final List<KeySlice>[] results = new List[1];
+ do
+ {
+
+ final boolean first = minKey.length == 0;
+ final IndexClause clause = new IndexClause(Arrays.asList(expression),
+ ByteBuffer.wrap(minKey),
+ ((SettingsCommandMulti) state.settings.command).keysAtOnce);
+
+ timeWithRetry(new RunOp()
+ {
+ @Override
+ public boolean run() throws Exception
+ {
+ results[0] = client.get_indexed_slices(parent, clause, predicate, state.settings.command.consistencyLevel);
+ return !first || results[0].size() > 0;
+ }
+
+ @Override
+ public String key()
+ {
+ return new String(value.array());
+ }
+
+ @Override
+ public int keyCount()
+ {
+ return results[0].size();
+ }
+ });
+
+ minKey = getNextMinKey(minKey, results[0]);
+
+ } while (results[0].size() > 0);
+ }
+
+ /**
+ * Get maximum key from keySlice list
+ * @param slices list of the KeySlice objects
+ * @return maximum key value of the list
+ */
+ private static byte[] getNextMinKey(byte[] cur, List<KeySlice> slices)
+ {
+ // find max
+ for (KeySlice slice : slices)
+ if (FBUtilities.compareUnsigned(cur, slice.getKey()) < 0)
+ cur = slice.getKey();
+
+ // increment
+ for (int i = 0 ; i < cur.length ; i++)
+ if (++cur[i] != 0)
+ break;
+ return cur;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
new file mode 100644
index 0000000..c5f8051
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+public final class ThriftInserter extends Operation
+{
+
+ public ThriftInserter(State state, long index)
+ {
+ super(state, index);
+ }
+
+ public void run(final ThriftClient client) throws IOException
+ {
+ final ByteBuffer key = getKey();
+ final List<Column> columns = generateColumns();
+
+ Map<String, List<Mutation>> row;
+ if (!state.settings.columns.useSuperColumns)
+ {
+ List<Mutation> mutations = new ArrayList<>(columns.size());
+ for (Column c : columns)
+ {
+ ColumnOrSuperColumn column = new ColumnOrSuperColumn().setColumn(c);
+ mutations.add(new Mutation().setColumn_or_supercolumn(column));
+ }
+ row = Collections.singletonMap(state.settings.schema.columnFamily, mutations);
+ }
+ else
+ {
+ List<Mutation> mutations = new ArrayList<>(state.columnParents.size());
+ for (ColumnParent parent : state.columnParents)
+ {
+ final SuperColumn s = new SuperColumn(parent.bufferForSuper_column(), columns);
+ final ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setSuper_column(s);
+ mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
+ }
+ row = Collections.singletonMap("Super1", mutations);
+ }
+
+ final Map<ByteBuffer, Map<String, List<Mutation>>> record = Collections.singletonMap(key, row);
+
+ timeWithRetry(new RunOp()
+ {
+ @Override
+ public boolean run() throws Exception
+ {
+ client.batch_mutate(record, state.settings.command.consistencyLevel);
+ return true;
+ }
+
+ @Override
+ public String key()
+ {
+ return new String(key.array());
+ }
+
+ @Override
+ public int keyCount()
+ {
+ return 1;
+ }
+ });
+ }
+
+ protected List<Column> generateColumns()
+ {
+ final List<ByteBuffer> values = generateColumnValues();
+ final List<Column> columns = new ArrayList<>(values.size());
+
+ if (state.settings.columns.useTimeUUIDComparator)
+ for (int i = 0 ; i < values.size() ; i++)
+ new Column(TimeUUIDType.instance.decompose(UUIDGen.getTimeUUID()));
+ else
+ // TODO : consider randomly allocating column names in case where have fewer than max columns
+ // but need to think about implications for indexes / indexed range slicer / other knock on effects
+ for (int i = 0 ; i < values.size() ; i++)
+ columns.add(new Column(getColumnNameBytes(i)));
+
+ for (int i = 0 ; i < values.size() ; i++)
+ columns.get(i)
+ .setValue(values.get(i))
+ .setTimestamp(FBUtilities.timestampMicros());
+
+ return columns;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java
new file mode 100644
index 0000000..01c7325
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.settings.SettingsCommandMulti;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+public final class ThriftMultiGetter extends Operation
+{
+
+ public ThriftMultiGetter(State state, long index)
+ {
+ super(state, index);
+ }
+
+ public void run(final ThriftClient client) throws IOException
+ {
+
+ final SlicePredicate predicate = new SlicePredicate().setSlice_range(
+ new SliceRange(
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ false,
+ state.settings.columns.maxColumnsPerKey
+ )
+ );
+
+ final List<ByteBuffer> keys = getKeys(((SettingsCommandMulti) state.settings.command).keysAtOnce);
+
+ for (final ColumnParent parent : state.columnParents)
+ {
+ timeWithRetry(new RunOp()
+ {
+ int count;
+ @Override
+ public boolean run() throws Exception
+ {
+ return (count = client.multiget_slice(keys, parent, predicate, state.settings.command.consistencyLevel).size()) != 0;
+ }
+
+ @Override
+ public String key()
+ {
+ return keys.toString();
+ }
+
+ @Override
+ public int keyCount()
+ {
+ return count;
+ }
+ });
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
new file mode 100644
index 0000000..ce6c8cd
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftRangeSlicer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.settings.SettingsCommandMulti;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public final class ThriftRangeSlicer extends Operation
+{
+
+ public ThriftRangeSlicer(State state, long index)
+ {
+ super(state, index);
+ }
+
+ @Override
+ public void run(final ThriftClient client) throws IOException
+ {
+ final SlicePredicate predicate = new SlicePredicate()
+ .setSlice_range(
+ new SliceRange(
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ false,
+ state.settings.columns.maxColumnsPerKey
+ )
+ );
+
+ final ByteBuffer start = getKey();
+ final KeyRange range =
+ new KeyRange(state.settings.columns.maxColumnsPerKey)
+ .setStart_key(start)
+ .setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .setCount(((SettingsCommandMulti)state.settings.command).keysAtOnce);
+
+ for (final ColumnParent parent : state.columnParents)
+ {
+ timeWithRetry(new RunOp()
+ {
+ private int count = 0;
+ @Override
+ public boolean run() throws Exception
+ {
+ return (count = client.get_range_slices(parent, predicate, range, state.settings.command.consistencyLevel).size()) != 0;
+ }
+
+ @Override
+ public String key()
+ {
+ return new String(range.bufferForStart_key().array());
+ }
+
+ @Override
+ public int keyCount()
+ {
+ return count;
+ }
+ });
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
new file mode 100644
index 0000000..a8605e8
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.operations;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+
+public final class ThriftReader extends Operation
+{
+
+ public ThriftReader(State state, long index)
+ {
+ super(state, index);
+ }
+
+ public void run(final ThriftClient client) throws IOException
+ {
+ final SlicePredicate predicate = new SlicePredicate();
+ if (state.settings.columns.names == null)
+ predicate.setSlice_range(new SliceRange()
+ .setStart(new byte[] {})
+ .setFinish(new byte[] {})
+ .setReversed(false)
+ .setCount(state.settings.columns.maxColumnsPerKey)
+ );
+ else // see CASSANDRA-3064 about why this is useful
+ predicate.setColumn_names(state.settings.columns.names);
+
+ final ByteBuffer key = getKey();
+ for (final ColumnParent parent : state.columnParents)
+ {
+ timeWithRetry(new RunOp()
+ {
+ @Override
+ public boolean run() throws Exception
+ {
+ return client.get_slice(key, parent, predicate, state.settings.command.consistencyLevel).size() != 0;
+ }
+
+ @Override
+ public String key()
+ {
+ return new String(key.array());
+ }
+
+ @Override
+ public int keyCount()
+ {
+ return 1;
+ }
+ });
+ }
+ }
+
+}