You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/06/30 14:11:14 UTC
git commit: (stress) support for CQL prepared statements patch by
David Alves; reviewed by Pavel Yaskevich for CASSANDRA-3633
Updated Branches:
refs/heads/cassandra-1.1 b94d8d40f -> b1c60d2b3
(stress) support for CQL prepared statements
patch by David Alves; reviewed by Pavel Yaskevich for CASSANDRA-3633
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1c60d2b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1c60d2b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1c60d2b
Branch: refs/heads/cassandra-1.1
Commit: b1c60d2b33815d7ba2136b5c3318f7dbae3ee062
Parents: b94d8d4
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Sat Jun 30 14:52:55 2012 +0300
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Sat Jun 30 15:09:46 2012 +0300
----------------------------------------------------------------------
CHANGES.txt | 4 +
.../src/org/apache/cassandra/stress/Session.java | 26 ++++-
.../org/apache/cassandra/stress/StressAction.java | 4 +-
.../cassandra/stress/operations/CounterAdder.java | 3 +-
.../cassandra/stress/operations/CounterGetter.java | 3 +-
.../stress/operations/CqlCounterAdder.java | 47 +++++++---
.../stress/operations/CqlCounterGetter.java | 38 ++++++--
.../stress/operations/CqlIndexedRangeSlicer.java | 44 +++++---
.../cassandra/stress/operations/CqlInserter.java | 50 +++++++---
.../stress/operations/CqlMultiGetter.java | 4 +-
.../stress/operations/CqlRangeSlicer.java | 38 ++++++--
.../cassandra/stress/operations/CqlReader.java | 61 ++++++++---
.../stress/operations/IndexedRangeSlicer.java | 3 +-
.../cassandra/stress/operations/Inserter.java | 3 +-
.../cassandra/stress/operations/MultiGetter.java | 3 +-
.../cassandra/stress/operations/RangeSlicer.java | 3 +-
.../apache/cassandra/stress/operations/Reader.java | 3 +-
.../cassandra/stress/util/CassandraClient.java | 34 +++++++
.../apache/cassandra/stress/util/Operation.java | 77 +++++++++++++--
19 files changed, 352 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index eda806c..25d9784 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+1.1.3
+ * (stress) support for CQL prepared statements (CASSANDRA-3633)
+
+
1.1.2
* Fix cleanup not deleting index entries (CASSANDRA-4379)
* Use correct partitioner when saving + loading caches (CASSANDRA-4331)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/Session.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Session.java b/tools/stress/src/org/apache/cassandra/stress/Session.java
index 5455e67..dbe1951 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Session.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Session.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.marshal.*;
import org.apache.commons.cli.*;
import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.thrift.*;
import org.apache.commons.lang.StringUtils;
@@ -80,6 +81,7 @@ public class Session implements Serializable
availableOptions.addOption("g", "keys-per-call", true, "Number of keys to get_range_slices or multiget per call, default:1000");
availableOptions.addOption("l", "replication-factor", true, "Replication Factor to use when creating needed column families, default:1");
availableOptions.addOption("L", "enable-cql", false, "Perform queries using CQL (Cassandra Query Language).");
+ availableOptions.addOption("P", "use-prepared-statements", false, "Perform queries using prepared statements (only applicable to CQL).");
availableOptions.addOption("e", "consistency-level", true, "Consistency Level to use (ONE, QUORUM, LOCAL_QUORUM, EACH_QUORUM, ALL, ANY), default:ONE");
availableOptions.addOption("x", "create-index", true, "Type of index to create on needed column families (KEYS)");
availableOptions.addOption("R", "replication-strategy", true, "Replication strategy to use (only on insert if keyspace does not exist), default:org.apache.cassandra.locator.SimpleStrategy");
@@ -114,6 +116,7 @@ public class Session implements Serializable
private boolean replicateOnWrite = true;
private boolean ignoreErrors = false;
private boolean enable_cql = false;
+ private boolean use_prepared = false;
private final String outFileName;
@@ -265,6 +268,16 @@ public class Session implements Serializable
if (cmd.hasOption("L"))
enable_cql = true;
+ if (cmd.hasOption("P"))
+ {
+ if (!enable_cql)
+ {
+ System.err.println("-P/--use-prepared-statements is only applicable with CQL (-L/--enable-cql)");
+ System.exit(-1);
+ }
+ use_prepared = true;
+ }
+
if (cmd.hasOption("O"))
{
String[] pairs = StringUtils.split(cmd.getOptionValue("O"), ',');
@@ -500,6 +513,11 @@ public class Session implements Serializable
return enable_cql;
}
+ public boolean usePreparedStatements()
+ {
+ return use_prepared;
+ }
+
/**
* Create Keyspace1 with Standard1 and Super1 column families
*/
@@ -556,7 +574,7 @@ public class Session implements Serializable
keyspace.setCf_defs(new ArrayList<CfDef>(Arrays.asList(standardCfDef, superCfDef, counterCfDef, counterSuperCfDef)));
- Cassandra.Client client = getClient(false);
+ CassandraClient client = getClient(false);
try
{
@@ -578,7 +596,7 @@ public class Session implements Serializable
* Thrift client connection with Keyspace1 set.
* @return cassandra client connection
*/
- public Cassandra.Client getClient()
+ public CassandraClient getClient()
{
return getClient(true);
}
@@ -587,14 +605,14 @@ public class Session implements Serializable
* @param setKeyspace - should we set keyspace for client or not
* @return cassandra client connection
*/
- public Cassandra.Client getClient(boolean setKeyspace)
+ public CassandraClient getClient(boolean setKeyspace)
{
// random node selection for fake load balancing
String currentNode = nodes[Stress.randomizer.nextInt(nodes.length)];
TSocket socket = new TSocket(currentNode, port);
TTransport transport = (isUnframed()) ? socket : new TFramedTransport(socket);
- Cassandra.Client client = new Cassandra.Client(new TBinaryProtocol(transport));
+ CassandraClient client = new CassandraClient(new TBinaryProtocol(transport));
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index ac774b7..1227fe8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -22,8 +22,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import org.apache.cassandra.stress.operations.*;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
public class StressAction extends Thread
{
@@ -215,7 +215,7 @@ public class StressAction extends Thread
public void run()
{
- Cassandra.Client connection = client.getClient();
+ CassandraClient connection = client.getClient();
for (int i = 0; i < items; i++)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
index 0c80f0a..0420154 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.stress.operations;
import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -37,7 +38,7 @@ public class CounterAdder extends Operation
super(client, index);
}
- public void run(Cassandra.Client client) throws IOException
+ public void run(CassandraClient client) throws IOException
{
List<CounterColumn> columns = new ArrayList<CounterColumn>();
List<CounterSuperColumn> superColumns = new ArrayList<CounterSuperColumn>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
index 3d8b1fd..a06298d 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.stress.operations;
import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -33,7 +34,7 @@ public class CounterGetter extends Operation
super(client, index);
}
- public void run(Cassandra.Client client) throws IOException
+ public void run(CassandraClient client) throws IOException
{
SliceRange sliceRange = new SliceRange();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
index fa82d57..7197eaa 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
@@ -23,39 +23,48 @@ package org.apache.cassandra.stress.operations;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Compression;
-
-import static com.google.common.base.Charsets.UTF_8;
+import org.apache.cassandra.utils.ByteBufferUtil;
public class CqlCounterAdder extends Operation
{
+ private static String cqlQuery = null;
+
public CqlCounterAdder(Session client, int idx)
{
super(client, idx);
}
- public void run(Cassandra.Client client) throws IOException
+ public void run(CassandraClient client) throws IOException
{
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
throw new RuntimeException("Super columns are not implemented for CQL");
- StringBuilder query = new StringBuilder(
- "UPDATE Counter1 USING CONSISTENCY " + session.getConsistencyLevel().toString() + " SET ");
-
- for (int i = 0; i < session.getColumnsPerKey(); i++)
+ if (cqlQuery == null)
{
- if (i > 0)
- query.append(",");
- query.append('C').append(i).append("=C").append(i).append("+1");
+ StringBuilder query = new StringBuilder(
+ "UPDATE Counter1 USING CONSISTENCY " + session.getConsistencyLevel().toString() + " SET ");
+
+ for (int i = 0; i < session.getColumnsPerKey(); i++)
+ {
+ if (i > 0)
+ query.append(",");
+
+ query.append('C').append(i).append("=C").append(i).append("+1");
+
+ }
+ query.append(" WHERE KEY=?");
+ cqlQuery = query.toString();
}
String key = String.format("%0" + session.getTotalKeysLength() + "d", index);
- query.append( " WHERE KEY=").append(getQuotedCqlBlob(key.getBytes(UTF_8)));
+ String formattedQuery = null;
long start = System.currentTimeMillis();
@@ -69,7 +78,19 @@ public class CqlCounterAdder extends Operation
try
{
- client.execute_cql_query(ByteBuffer.wrap(query.toString().getBytes()), Compression.NONE);
+ if (session.usePreparedStatements())
+ {
+ Integer stmntId = getPreparedStatement(client, cqlQuery);
+ client.execute_prepared_cql_query(stmntId,
+ Collections.singletonList(ByteBufferUtil.bytes(getUnQuotedCqlBlob(key))));
+ }
+ else
+ {
+ if (formattedQuery == null)
+ formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key)));
+ client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE);
+ }
+
success = true;
}
catch (Exception e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
index 1044c6d..1133747 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
@@ -23,33 +23,41 @@ package org.apache.cassandra.stress.operations;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.CqlResultType;
+import org.apache.cassandra.utils.ByteBufferUtil;
public class CqlCounterGetter extends Operation
{
+ private static String cqlQuery = null;
public CqlCounterGetter(Session client, int idx)
{
super(client, idx);
}
- public void run(Cassandra.Client client) throws IOException
+ public void run(CassandraClient client) throws IOException
{
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
throw new RuntimeException("Super columns are not implemented for CQL");
+ if (cqlQuery == null)
+ {
+ StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
+ .append(" ''..'' FROM Counter1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString())
+ .append(" WHERE KEY=?");
+ cqlQuery = query.toString();
+ }
+
byte[] key = generateKey();
- String hexKey = getQuotedCqlBlob(key);
- StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
- .append(" ''..'' FROM Counter1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString())
- .append(" WHERE KEY=").append(hexKey);
+ String formattedQuery = null;
long start = System.currentTimeMillis();
@@ -63,8 +71,22 @@ public class CqlCounterGetter extends Operation
try
{
- CqlResult result = client.execute_cql_query(ByteBuffer.wrap(query.toString().getBytes()),
- Compression.NONE);
+ CqlResult result = null;
+
+ if (session.usePreparedStatements())
+ {
+ Integer stmntId = getPreparedStatement(client, cqlQuery);
+ result = client.execute_prepared_cql_query(stmntId,
+ Collections.singletonList(ByteBufferUtil.bytes(getUnQuotedCqlBlob(key))));
+ }
+ else
+ {
+ if (formattedQuery == null)
+ formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key)));
+ result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()),
+ Compression.NONE);
+ }
+
assert result.type.equals(CqlResultType.ROWS) : "expected ROWS result type";
assert result.rows.size() == 0 : "expected exactly one row";
success = (result.rows.get(0).columns.size() != 0);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
index 978c1c4..383ad67 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
@@ -23,30 +23,29 @@ package org.apache.cassandra.stress.operations;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.CqlRow;
import org.apache.cassandra.utils.ByteBufferUtil;
-import static org.apache.cassandra.utils.Hex.bytesToHex;;
-
public class CqlIndexedRangeSlicer extends Operation
{
private static List<ByteBuffer> values = null;
- private static String clauseFragment = "KEY > '%s' LIMIT %d";
+ private static String cqlQuery = null;
public CqlIndexedRangeSlicer(Session client, int idx)
{
super(client, idx);
}
- public void run(Cassandra.Client client) throws IOException
+ public void run(CassandraClient client) throws IOException
{
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
throw new RuntimeException("Super columns are not implemented for CQL");
@@ -54,12 +53,18 @@ public class CqlIndexedRangeSlicer extends Operation
if (values == null)
values = generateValues();
- String format = "%0" + session.getTotalKeysLength() + "d";
+ if (cqlQuery == null)
+ {
+ StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
+ .append(" ''..'' FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel())
+ .append(" WHERE C1=").append(getUnQuotedCqlBlob(values.get(1).array()))
+ .append(" AND KEY > ? LIMIT ").append(session.getKeysPerCall());
+
+ cqlQuery = query.toString();
+ }
+ String format = "%0" + session.getTotalKeysLength() + "d";
String startOffset = String.format(format, 0);
- StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
- .append(" ''..'' FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel())
- .append(" WHERE C1 = ").append(getQuotedCqlBlob(values.get(1).array())).append(" AND ");
int expectedPerValue = session.getNumKeys() / values.size(), received = 0;
@@ -70,6 +75,8 @@ public class CqlIndexedRangeSlicer extends Operation
boolean success = false;
String exceptionMessage = null;
CqlResult results = null;
+ String formattedQuery = null;
+ List<String> queryParms = Collections.singletonList(getUnQuotedCqlBlob(startOffset));
for (int t = 0; t < session.getRetryTimes(); t++)
{
@@ -78,8 +85,18 @@ public class CqlIndexedRangeSlicer extends Operation
try
{
- ByteBuffer queryBytes = ByteBuffer.wrap(makeQuery(query, startOffset).getBytes());
- results = client.execute_cql_query(queryBytes, Compression.NONE);
+ if (session.usePreparedStatements())
+ {
+ Integer stmntId = getPreparedStatement(client, cqlQuery);
+ results = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParms));
+ }
+ else
+ {
+ if (formattedQuery == null)
+ formattedQuery = formatCqlQuery(cqlQuery, queryParms);
+ results = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE);
+ }
+
success = (results.rows.size() != 0);
}
catch (Exception e)
@@ -109,11 +126,6 @@ public class CqlIndexedRangeSlicer extends Operation
}
}
- private String makeQuery(StringBuilder base, String startOffset)
- {
- return base.toString() + String.format(clauseFragment, bytesToHex(startOffset.getBytes()), session.getKeysPerCall());
- }
-
/**
* Get maximum key from CqlRow list
* @param rows list of the CqlRow objects
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
index d7a7641..c729f2f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
@@ -23,25 +23,27 @@ package org.apache.cassandra.stress.operations;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.utils.UUIDGen;
public class CqlInserter extends Operation
{
private static List<ByteBuffer> values;
+ private static String cqlQuery = null;
public CqlInserter(Session client, int idx)
{
super(client, idx);
}
- public void run(Cassandra.Client client) throws IOException
+ public void run(CassandraClient client) throws IOException
{
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
throw new RuntimeException("Super columns are not implemented for CQL");
@@ -49,26 +51,39 @@ public class CqlInserter extends Operation
if (values == null)
values = generateValues();
- StringBuilder query = new StringBuilder("UPDATE Standard1 USING CONSISTENCY ")
- .append(session.getConsistencyLevel().toString()).append(" SET ");
+ // Construct a query string once.
+ if (cqlQuery == null)
+ {
+ StringBuilder query = new StringBuilder("UPDATE Standard1 USING CONSISTENCY ")
+ .append(session.getConsistencyLevel().toString()).append(" SET ");
+ for (int i = 0; i < session.getColumnsPerKey(); i++)
+ {
+ if (i > 0) query.append(',');
+ query.append("?=?");
+ }
+
+ query.append(" WHERE KEY=?");
+ cqlQuery = query.toString();
+ }
+
+ List<String> queryParms = new ArrayList<String>();
for (int i = 0; i < session.getColumnsPerKey(); i++)
{
- if (i > 0)
- query.append(',');
-
// Column name
if (session.timeUUIDComparator)
- query.append(UUIDGen.makeType1UUIDFromHost(Session.getLocalAddress()).toString());
+ queryParms.add(UUIDGen.makeType1UUIDFromHost(Session.getLocalAddress()).toString());
else
- query.append('C').append(i);
+ queryParms.add(new String("C" + i));
// Column value
- query.append('=').append(getQuotedCqlBlob(values.get(i % values.size()).array()));
+ queryParms.add(new String(getUnQuotedCqlBlob(values.get(i % values.size()).array())));
}
String key = String.format("%0" + session.getTotalKeysLength() + "d", index);
- query.append(" WHERE KEY=").append(getQuotedCqlBlob(key));
+ queryParms.add(new String(getUnQuotedCqlBlob(key)));
+
+ String formattedQuery = null;
long start = System.currentTimeMillis();
@@ -82,7 +97,18 @@ public class CqlInserter extends Operation
try
{
- client.execute_cql_query(ByteBuffer.wrap(query.toString().getBytes()), Compression.NONE);
+ if (session.usePreparedStatements())
+ {
+ Integer stmntId = getPreparedStatement(client, cqlQuery);
+ client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParms));
+ }
+ else
+ {
+ if (formattedQuery == null)
+ formattedQuery = formatCqlQuery(cqlQuery, queryParms);
+ client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE);
+ }
+
success = true;
}
catch (Exception e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
index 3125cff..e9b1f47 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
@@ -24,8 +24,8 @@ package org.apache.cassandra.stress.operations;
import java.io.IOException;
import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
public class CqlMultiGetter extends Operation
{
@@ -34,7 +34,7 @@ public class CqlMultiGetter extends Operation
super(client, idx);
}
- public void run(Cassandra.Client client) throws IOException
+ public void run(CassandraClient client) throws IOException
{
throw new RuntimeException("Multiget is not implemented for CQL");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
index e57a9ac..8b20867 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
@@ -23,30 +23,41 @@ package org.apache.cassandra.stress.operations;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlResult;
public class CqlRangeSlicer extends Operation
{
+ private static String cqlQuery = null;
+
public CqlRangeSlicer(Session client, int idx)
{
super(client, idx);
}
- public void run(Cassandra.Client client) throws IOException
+ public void run(CassandraClient client) throws IOException
{
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
throw new RuntimeException("Super columns are not implemented for CQL");
+ if (cqlQuery == null)
+ {
+ StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
+ .append(" ''..'' FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString())
+ .append(" WHERE KEY > ?");
+ cqlQuery = query.toString();
+ }
+
String key = String.format("%0" + session.getTotalKeysLength() + "d", index);
- StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey())
- .append(" ''..'' FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString())
- .append(" WHERE KEY > ").append(getQuotedCqlBlob(key));
+ String formattedQuery = null;
long startTime = System.currentTimeMillis();
@@ -61,8 +72,21 @@ public class CqlRangeSlicer extends Operation
try
{
- CqlResult result = client.execute_cql_query(ByteBuffer.wrap(query.toString().getBytes()),
- Compression.NONE);
+ CqlResult result = null;
+
+ if (session.usePreparedStatements())
+ {
+ Integer stmntId = getPreparedStatement(client, cqlQuery);
+ result = client.execute_prepared_cql_query(stmntId,
+ Collections.singletonList(ByteBufferUtil.bytes(getUnQuotedCqlBlob(key))));
+ }
+ else
+ {
+ if (formattedQuery == null)
+ formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key)));
+ result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE);
+ }
+
rowCount = result.rows.size();
success = (rowCount != 0);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
index 93a5c79..cfac2d6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
@@ -23,46 +23,60 @@ package org.apache.cassandra.stress.operations;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlResult;
public class CqlReader extends Operation
{
+ private static String cqlQuery = null;
+
public CqlReader(Session client, int idx)
{
super(client, idx);
}
- public void run(Cassandra.Client client) throws IOException
+ public void run(CassandraClient client) throws IOException
{
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
throw new RuntimeException("Super columns are not implemented for CQL");
- StringBuilder query = new StringBuilder("SELECT ");
-
- if (session.columnNames == null)
- {
- query.append("FIRST ").append(session.getColumnsPerKey()).append(" ''..''");
- }
- else
+ if (cqlQuery == null)
{
- for (int i = 0; i < session.columnNames.size(); i++)
+ StringBuilder query = new StringBuilder("SELECT ");
+
+ if (session.columnNames == null)
+ query.append("FIRST ").append(session.getColumnsPerKey()).append(" ''..''");
+ else
{
- if (i > 0)
- query.append(",");
- query.append('\'').append(new String(session.columnNames.get(i).array())).append('\'');
+ for (int i = 0; i < session.columnNames.size(); i++)
+ {
+ if (i > 0) query.append(",");
+ query.append('?');
+ }
}
+
+ query.append(" FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString());
+ query.append(" WHERE KEY=?");
+
+ cqlQuery = query.toString();
}
+ List<String> queryParams = new ArrayList<String>();
+ if (session.columnNames != null)
+ for (int i = 0; i < session.columnNames.size(); i++)
+ queryParams.add(getUnQuotedCqlBlob(session.columnNames.get(i).array()));
+
byte[] key = generateKey();
+ queryParams.add(getUnQuotedCqlBlob(key));
- query.append(" FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString());
- query.append(" WHERE KEY=").append(getQuotedCqlBlob(key));
+ String formattedQuery = null;
long start = System.currentTimeMillis();
@@ -76,8 +90,21 @@ public class CqlReader extends Operation
try
{
- CqlResult result = client.execute_cql_query(ByteBuffer.wrap(query.toString().getBytes()),
- Compression.NONE);
+ CqlResult result = null;
+
+ if (session.usePreparedStatements())
+ {
+ Integer stmntId = getPreparedStatement(client, cqlQuery);
+ result = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParams));
+ }
+ else
+ {
+ if (formattedQuery == null)
+ formattedQuery = formatCqlQuery(cqlQuery, queryParams);
+ result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()),
+ Compression.NONE);
+ }
+
success = (result.rows.get(0).columns.size() != 0);
}
catch (Exception e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
index c117862..8768de8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.stress.operations;
import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -36,7 +37,7 @@ public class IndexedRangeSlicer extends Operation
super(client, index);
}
- public void run(Cassandra.Client client) throws IOException
+ public void run(CassandraClient client) throws IOException
{
if (values == null)
values = generateValues();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
index a887724..0623e4c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.stress.operations;
import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -40,7 +41,7 @@ public class Inserter extends Operation
super(client, index);
}
- public void run(Cassandra.Client client) throws IOException
+ public void run(CassandraClient client) throws IOException
{
if (values == null)
values = generateValues();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
index c50dd1b..f569f66 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.stress.operations;
import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -37,7 +38,7 @@ public class MultiGetter extends Operation
super(client, index);
}
- public void run(Cassandra.Client client) throws IOException
+ public void run(CassandraClient client) throws IOException
{
SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
index 308eefe..e462e30 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.stress.operations;
import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -36,7 +37,7 @@ public class RangeSlicer extends Operation
super(client, index);
}
- public void run(Cassandra.Client client) throws IOException
+ public void run(CassandraClient client) throws IOException
{
String format = "%0" + session.getTotalKeysLength() + "d";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java b/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
index b5a8781..412ebdf 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.stress.operations;
import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.util.CassandraClient;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -35,7 +36,7 @@ public class Reader extends Operation
super(client, index);
}
- public void run(Cassandra.Client client) throws IOException
+ public void run(CassandraClient client) throws IOException
{
// initialize SlicePredicate with existing SliceRange
SlicePredicate predicate = new SlicePredicate();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java b/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java
new file mode 100644
index 0000000..5136a55
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.thrift.Cassandra.Client;
+import org.apache.thrift.protocol.TProtocol;
+
+public class CassandraClient extends Client
+{
+ public Map<Integer, Integer> preparedStatements = new HashMap<Integer, Integer>();
+
+ public CassandraClient(TProtocol protocol)
+ {
+ super(protocol);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Operation.java b/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
index 4e08909..cbeaad7 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.stress.util;
+import static com.google.common.base.Charsets.UTF_8;
+
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
@@ -25,17 +27,19 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
-import static com.google.common.base.Charsets.UTF_8;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.Stress;
-import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.Compression;
+import org.apache.cassandra.thrift.CqlPreparedResult;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.UUIDGen;
public abstract class Operation
{
@@ -61,7 +65,7 @@ public abstract class Operation
* @param client Cassandra Thrift client connection
* @throws IOException on any I/O error.
*/
- public abstract void run(Cassandra.Client client) throws IOException;
+ public abstract void run(CassandraClient client) throws IOException;
// Utility methods
@@ -226,13 +230,70 @@ public abstract class Operation
System.err.println(message);
}
- protected String getQuotedCqlBlob(String term)
+ protected String getUnQuotedCqlBlob(String term)
+ {
+ return getUnQuotedCqlBlob(term.getBytes());
+ }
+
+ protected String getUnQuotedCqlBlob(byte[] term)
+ {
+ return Hex.bytesToHex(term);
+ }
+
+ protected List<ByteBuffer> queryParamsAsByteBuffer(List<String> queryParams)
+ {
+ return Lists.transform(queryParams, new Function<String, ByteBuffer>()
+ {
+ @Override
+ public ByteBuffer apply(String param)
+ {
+ return ByteBufferUtil.bytes(param);
+ }
+ });
+ }
+
+ /**
+ * Constructs a CQL query string by replacing instances of the character
+ * '?', with the corresponding parameter.
+ *
+ * @param query base query string to format
+ * @param parms sequence of string query parameters
+ * @return formatted CQL query string
+ */
+ protected static String formatCqlQuery(String query, List<String> parms)
{
- return getQuotedCqlBlob(term.getBytes());
+ int marker = 0, position = 0;
+ StringBuilder result = new StringBuilder();
+
+ if (-1 == (marker = query.indexOf('?')) || parms.size() == 0)
+ return query;
+
+ for (String parm : parms)
+ {
+ result.append(query.substring(position, marker));
+ result.append('\'').append(parm).append('\'');
+
+ position = marker + 1;
+ if (-1 == (marker = query.indexOf('?', position + 1)))
+ break;
+ }
+
+ if (position < query.length())
+ result.append(query.substring(position));
+
+ return result.toString();
}
- protected String getQuotedCqlBlob(byte[] term)
+ protected static Integer getPreparedStatement(CassandraClient client, String cqlQuery) throws Exception
{
- return String.format("'%s'", Hex.bytesToHex(term));
+ Integer statementId = client.preparedStatements.get(cqlQuery.hashCode());
+ if (statementId == null)
+ {
+ CqlPreparedResult response = client.prepare_cql_query(ByteBufferUtil.bytes(cqlQuery), Compression.NONE);
+ statementId = response.itemId;
+ client.preparedStatements.put(cqlQuery.hashCode(), statementId);
+ }
+
+ return statementId;
}
}