You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/02/23 20:25:40 UTC
svn commit: r1073894 - in
/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress:
./ operations/ util/
Author: brandonwilliams
Date: Wed Feb 23 19:25:40 2011
New Revision: 1073894
URL: http://svn.apache.org/viewvc?rev=1073894&view=rev
Log:
Switch stress.java to a producer/consumer model for better performance.
Patch by Pavel Yaskevich, reviewed by brandonwilliams for CASSANDRA-2020
Removed:
cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java
cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/util/Range.java
Modified:
cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java
cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java
Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java?rev=1073894&r1=1073893&r2=1073894&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java Wed Feb 23 19:25:40 2011
@@ -20,8 +20,8 @@ package org.apache.cassandra.contrib.str
import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.atomic.AtomicIntegerArray;
-import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.*;
@@ -38,9 +38,9 @@ public class Session
// command line options
public static final Options availableOptions = new Options();
- public final AtomicIntegerArray operationCount;
- public final AtomicIntegerArray keyCount;
- public final AtomicLongArray latencies;
+ public final AtomicInteger operations;
+ public final AtomicInteger keys;
+ public final AtomicLong latency;
static
{
@@ -93,7 +93,7 @@ public class Session
private PrintStream out = System.out;
private IndexType indexType = null;
- private Stress.Operation operation = Stress.Operation.INSERT;
+ private Stress.Operations operation = Stress.Operations.INSERT;
private ColumnFamilyType columnFamilyType = ColumnFamilyType.Standard;
private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
private String replicationStrategy = "org.apache.cassandra.locator.SimpleStrategy";
@@ -183,7 +183,7 @@ public class Session
unframed = Boolean.parseBoolean(cmd.getOptionValue("m"));
if (cmd.hasOption("o"))
- operation = Stress.Operation.valueOf(cmd.getOptionValue("o").toUpperCase());
+ operation = Stress.Operations.valueOf(cmd.getOptionValue("o").toUpperCase());
if (cmd.hasOption("u"))
superColumns = Integer.parseInt(cmd.getOptionValue("u"));
@@ -248,9 +248,9 @@ public class Session
mean = numKeys / 2;
sigma = numKeys * STDev;
- operationCount = new AtomicIntegerArray(threads);
- keyCount = new AtomicIntegerArray(threads);
- latencies = new AtomicLongArray(threads);
+ operations = new AtomicInteger();
+ keys = new AtomicInteger();
+ latency = new AtomicLong();
}
public int getCardinality()
@@ -323,7 +323,7 @@ public class Session
return ignoreErrors;
}
- public Stress.Operation getOperation()
+ public Stress.Operations getOperation()
{
return operation;
}
Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java?rev=1073894&r1=1073893&r2=1073894&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java Wed Feb 23 19:25:40 2011
@@ -18,15 +18,18 @@
package org.apache.cassandra.contrib.stress;
import org.apache.cassandra.contrib.stress.operations.*;
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
+import org.apache.cassandra.thrift.Cassandra;
import org.apache.commons.cli.Option;
import java.io.PrintStream;
import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
public final class Stress
{
- public static enum Operation
+ public static enum Operations
{
INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET
}
@@ -34,9 +37,15 @@ public final class Stress
public static Session session;
public static Random randomizer = new Random();
+ /**
+ * Producer-Consumer model: 1 producer, N consumers
+ */
+ private static final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true);
+
public static void main(String[] arguments) throws Exception
{
- int epoch, total, oldTotal, latency, keyCount, oldKeyCount, oldLatency;
+ long latency, oldLatency;
+ int epoch, total, oldTotal, keyCount, oldKeyCount;
try
{
@@ -49,51 +58,52 @@ public final class Stress
}
// creating keyspace and column families
- if (session.getOperation() == Stress.Operation.INSERT)
+ if (session.getOperation() == Stress.Operations.INSERT)
{
session.createKeySpaces();
}
int threadCount = session.getThreads();
- Thread[] threads = new Thread[threadCount];
- PrintStream out = session.getOutputStream();
+ Thread[] consumers = new Thread[threadCount];
+ PrintStream out = session.getOutputStream();
+
+ out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
+
+ int itemsPerThread = session.getKeysPerThread();
+ int modulo = session.getNumKeys() % threadCount;
// creating required type of the threads for the test
- try
- {
- for (int i = 0; i < threadCount; i++)
- {
- threads[i] = createOperation(i);
- }
- }
- catch (Exception e)
+ for (int i = 0; i < threadCount; i++)
{
- System.err.println(e.getMessage());
- return;
+ if (i == threadCount - 1)
+ itemsPerThread += modulo; // last one is going to handle N + modulo items
+
+ consumers[i] = new Consumer(itemsPerThread);
}
+ new Producer().start();
+
// starting worker threads
for (int i = 0; i < threadCount; i++)
{
- threads[i].start();
+ consumers[i].start();
}
// initialization of the values
boolean terminate = false;
- epoch = total = latency = keyCount = 0;
+ latency = 0;
+ epoch = total = keyCount = 0;
int interval = session.getProgressInterval();
int epochIntervals = session.getProgressInterval() * 10;
long testStartTime = System.currentTimeMillis();
- out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
-
while (!terminate)
{
Thread.sleep(100);
int alive = 0;
- for (Thread thread : threads)
+ for (Thread thread : consumers)
if (thread.isAlive()) alive++;
if (alive == 0)
@@ -109,20 +119,9 @@ public final class Stress
oldLatency = latency;
oldKeyCount = keyCount;
- int currentTotal = 0, currentKeyCount = 0, currentLatency = 0;
-
- for (Thread t : threads)
- {
- OperationThread thread = (OperationThread) t;
-
- currentTotal += session.operationCount.get(thread.index);
- currentKeyCount += session.keyCount.get(thread.index);
- currentLatency += session.latencies.get(thread.index);
- }
-
- total = currentTotal;
- keyCount = currentKeyCount;
- latency = currentLatency;
+ total = session.operations.get();
+ keyCount = session.keys.get();
+ latency = session.latency.get();
int opDelta = total - oldTotal;
int keyDelta = keyCount - oldKeyCount;
@@ -136,7 +135,7 @@ public final class Stress
}
}
- private static Thread createOperation(int index)
+ private static Operation createOperation(int index)
{
switch (session.getOperation())
{
@@ -174,4 +173,58 @@ public final class Stress
option.getLongOpt(), (option.hasArg()) ? "="+upperCaseName : "", option.getDescription()));
}
}
+
+ /**
+ * Produces exactly N items (awaits each to be consumed)
+ */
+ private static class Producer extends Thread
+ {
+ public void run()
+ {
+ for (int i = 0; i < session.getNumKeys(); i++)
+ {
+ try
+ {
+ operations.put(createOperation(i));
+ }
+ catch (InterruptedException e)
+ {
+ System.err.println("Producer error - " + e.getMessage());
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Each consumes exactly N items from queue
+ */
+ private static class Consumer extends Thread
+ {
+ private final int items;
+
+ public Consumer(int toConsume)
+ {
+ items = toConsume;
+ }
+
+ public void run()
+ {
+ Cassandra.Client client = session.getClient();
+
+ for (int i = 0; i < items; i++)
+ {
+ try
+ {
+ operations.take().run(client); // running job
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ System.exit(-1);
+ }
+ }
+ }
+ }
+
}
Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java?rev=1073894&r1=1073893&r2=1073894&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java Wed Feb 23 19:25:40 2011
@@ -17,23 +17,23 @@
*/
package org.apache.cassandra.contrib.stress.operations;
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
-public class IndexedRangeSlicer extends OperationThread
+public class IndexedRangeSlicer extends Operation
{
public IndexedRangeSlicer(int index)
{
super(index);
}
- public void run()
+ public void run(Cassandra.Client client) throws IOException
{
String format = "%0" + session.getTotalKeysLength() + "d";
SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[]{}),
@@ -46,64 +46,59 @@ public class IndexedRangeSlicer extends
ByteBuffer columnName = ByteBuffer.wrap("C1".getBytes());
- for (int i = range.begins(); i < range.size(); i++)
- {
- int received = 0;
+ int received = 0;
- String startOffset = "0";
- ByteBuffer value = ByteBuffer.wrap(values.get(i % values.size()).getBytes());
+ String startOffset = "0";
+ ByteBuffer value = ByteBuffer.wrap(values.get(index % values.size()).getBytes());
- IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value);
+ IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value);
- while (received < expectedPerValue)
- {
- IndexClause clause = new IndexClause(Arrays.asList(expression), ByteBuffer.wrap(startOffset.getBytes()),
- session.getKeysPerCall());
+ while (received < expectedPerValue)
+ {
+ IndexClause clause = new IndexClause(Arrays.asList(expression),
+ ByteBuffer.wrap(startOffset.getBytes()),
+ session.getKeysPerCall());
- List<KeySlice> results = null;
- long start = System.currentTimeMillis();
+ List<KeySlice> results = null;
+ long start = System.currentTimeMillis();
- boolean success = false;
- String exceptionMessage = null;
+ boolean success = false;
+ String exceptionMessage = null;
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
+ for (int t = 0; t < session.getRetryTimes(); t++)
+ {
+ if (success)
+ break;
- try
- {
- results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
- success = (results.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
+ try
+ {
+ results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
+ success = (results.size() != 0);
}
-
- if (!success)
+ catch (Exception e)
{
- System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n",
- index,
- session.getRetryTimes(),
- startOffset,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
- if (!session.ignoreErrors())
- return;
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
+ }
+
+ if (!success)
+ {
+ error(String.format("Operation [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n",
+ index,
+ session.getRetryTimes(),
+ startOffset,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+ }
- received += results.size();
+ received += results.size();
- // convert max key found back to an integer, and increment it
- startOffset = String.format(format, (1 + getMaxKey(results)));
+ // convert max key found back to an integer, and increment it
+ startOffset = String.format(format, (1 + getMaxKey(results)));
- session.operationCount.getAndIncrement(index);
- session.keyCount.getAndAdd(index, results.size());
- session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
- }
+ session.operations.getAndIncrement();
+ session.keys.getAndAdd(results.size());
+ session.latency.getAndAdd(System.currentTimeMillis() - start);
}
}
Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java?rev=1073894&r1=1073893&r2=1073894&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java Wed Feb 23 19:25:40 2011
@@ -17,26 +17,27 @@
*/
package org.apache.cassandra.contrib.stress.operations;
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class Inserter extends OperationThread
+public class Inserter extends Operation
{
+
public Inserter(int index)
{
super(index);
}
- public void run()
+ public void run(Cassandra.Client client) throws IOException
{
List<String> values = generateValues();
List<Column> columns = new ArrayList<Column>();
@@ -48,8 +49,10 @@ public class Inserter extends OperationT
// columns = [Column('C' + str(j), 'unset', time.time() * 1000000) for j in xrange(columns_per_key)]
for (int i = 0; i < session.getColumnsPerKey(); i++)
{
- byte[] columnName = ("C" + Integer.toString(i)).getBytes();
- columns.add(new Column(ByteBuffer.wrap(columnName), ByteBuffer.wrap(new byte[] {}), System.currentTimeMillis()));
+ String columnName = ("C" + Integer.toString(i));
+ String columnValue = values.get(index % values.size());
+
+ columns.add(new Column(ByteBufferUtil.bytes(columnName), ByteBufferUtil.bytes(columnValue), System.currentTimeMillis()));
}
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
@@ -62,58 +65,47 @@ public class Inserter extends OperationT
}
}
- for (int i : range)
- {
- String rawKey = String.format(format, i);
- ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes());
- Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
-
- record.put(key, session.getColumnFamilyType() == ColumnFamilyType.Super
- ? getSuperColumnsMutationMap(superColumns)
- : getColumnsMutationMap(columns));
+ String rawKey = String.format(format, index);
+ Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
- String value = values.get(i % values.size());
+ record.put(ByteBufferUtil.bytes(rawKey), session.getColumnFamilyType() == ColumnFamilyType.Super
+ ? getSuperColumnsMutationMap(superColumns)
+ : getColumnsMutationMap(columns));
- for (Column c : columns)
- c.value = ByteBuffer.wrap(value.getBytes());
+ long start = System.currentTimeMillis();
- long start = System.currentTimeMillis();
+ boolean success = false;
+ String exceptionMessage = null;
- boolean success = false;
- String exceptionMessage = null;
+ for (int t = 0; t < session.getRetryTimes(); t++)
+ {
+ if (success)
+ break;
- for (int t = 0; t < session.getRetryTimes(); t++)
+ try
{
- if (success)
- break;
-
- try
- {
- client.batch_mutate(record, session.getConsistencyLevel());
- success = true;
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
+ client.batch_mutate(record, session.getConsistencyLevel());
+ success = true;
}
-
- if (!success)
+ catch (Exception e)
{
- System.err.printf("Thread [%d] retried %d times - error inserting key %s %s%n", index,
- session.getRetryTimes(),
- rawKey,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
- if (!session.ignoreErrors())
- break;
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
+ }
- session.operationCount.getAndIncrement(index);
- session.keyCount.getAndIncrement(index);
- session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
+ if (!success)
+ {
+ error(String.format("Operation [%d] retried %d times - error inserting key %s %s%n",
+ index,
+ session.getRetryTimes(),
+ rawKey,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
}
+
+ session.operations.getAndIncrement();
+ session.keys.getAndIncrement();
+ session.latency.getAndAdd(System.currentTimeMillis() - start);
}
private Map<String, List<Mutation>> getSuperColumnsMutationMap(List<SuperColumn> superColumns)
Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java?rev=1073894&r1=1073893&r2=1073894&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java Wed Feb 23 19:25:40 2011
@@ -17,90 +17,39 @@
*/
package org.apache.cassandra.contrib.stress.operations;
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-public class MultiGetter extends OperationThread
+public class MultiGetter extends Operation
{
public MultiGetter(int index)
{
super(index);
}
- public void run()
+ public void run(Cassandra.Client client) throws IOException
{
SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[]{}),
ByteBuffer.wrap(new byte[] {}),
false, session.getColumnsPerKey()));
int offset = index * session.getKeysPerThread();
- Map<ByteBuffer,List<ColumnOrSuperColumn>> results = null;
- int count = (((index + 1) * session.getKeysPerThread()) - offset) / session.getKeysPerCall();
+ Map<ByteBuffer,List<ColumnOrSuperColumn>> results;
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
{
- for (int i = 0; i < count; i++)
- {
- List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
-
- for (int j = 0; j < session.getSuperColumns(); j++)
- {
- ColumnParent parent = new ColumnParent("Super1").setSuper_column(("S" + j).getBytes());
-
- long start = System.currentTimeMillis();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
-
- try
- {
- results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
- success = (results.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- }
- }
-
- if (!success)
- {
- System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
- index,
- session.getRetryTimes(),
- keys,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
- if (!session.ignoreErrors())
- return;
- }
-
- session.operationCount.getAndIncrement(index);
- session.keyCount.getAndAdd(index, keys.size());
- session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
-
- offset += session.getKeysPerCall();
- }
- }
- }
- else
- {
- ColumnParent parent = new ColumnParent("Standard1");
+ List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
- for (int i = 0; i < count; i++)
+ for (int j = 0; j < session.getSuperColumns(); j++)
{
- List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
+ ColumnParent parent = new ColumnParent("Super1").setSuper_column(("S" + j).getBytes());
long start = System.currentTimeMillis();
@@ -120,29 +69,68 @@ public class MultiGetter extends Operati
catch (Exception e)
{
exceptionMessage = getExceptionMessage(e);
- success = false;
}
}
if (!success)
{
- System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
- index,
- session.getRetryTimes(),
- keys,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
- if (!session.ignoreErrors())
- return;
+ error(String.format("Operation [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+ index,
+ session.getRetryTimes(),
+ keys,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
}
- session.operationCount.getAndIncrement(index);
- session.keyCount.getAndAdd(index, keys.size());
- session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
+ session.operations.getAndIncrement();
+ session.keys.getAndAdd(keys.size());
+ session.latency.getAndAdd(System.currentTimeMillis() - start);
offset += session.getKeysPerCall();
}
}
+ else
+ {
+ ColumnParent parent = new ColumnParent("Standard1");
+
+ List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
+
+ long start = System.currentTimeMillis();
+
+ boolean success = false;
+ String exceptionMessage = null;
+
+ for (int t = 0; t < session.getRetryTimes(); t++)
+ {
+ if (success)
+ break;
+
+ try
+ {
+ results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+ success = (results.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
+ }
+ }
+
+ if (!success)
+ {
+ error(String.format("Operation [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+ index,
+ session.getRetryTimes(),
+ keys,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+ }
+
+ session.operations.getAndIncrement();
+ session.keys.getAndAdd(keys.size());
+ session.latency.getAndAdd(System.currentTimeMillis() - start);
+
+ offset += session.getKeysPerCall();
+ }
}
private List<ByteBuffer> generateKeys(int start, int limit)
Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java?rev=1073894&r1=1073893&r2=1073894&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java Wed Feb 23 19:25:40 2011
@@ -17,15 +17,17 @@
*/
package org.apache.cassandra.contrib.stress.operations;
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-public class RangeSlicer extends OperationThread
+public class RangeSlicer extends Operation
{
public RangeSlicer(int index)
@@ -33,87 +35,29 @@ public class RangeSlicer extends Operati
super(index);
}
- public void run()
+ public void run(Cassandra.Client client) throws IOException
{
String format = "%0" + session.getTotalKeysLength() + "d";
// initial values
- int current = range.begins();
- int limit = range.limit();
- int count = session.getColumnsPerKey();
- int last = current + session.getKeysPerCall();
+ int count = session.getColumnsPerKey();
SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[] {}),
ByteBuffer.wrap(new byte[] {}),
- false, count));
+ false,
+ count));
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
{
- while (current < limit)
- {
- byte[] start = String.format(format, current).getBytes();
- byte[] end = String.format(format, last).getBytes();
-
- List<KeySlice> slices = new ArrayList<KeySlice>();
- KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(end);
-
- for (int i = 0; i < session.getSuperColumns(); i++)
- {
- String superColumnName = "S" + Integer.toString(i);
- ColumnParent parent = new ColumnParent("Super1").setSuper_column(ByteBuffer.wrap(superColumnName.getBytes()));
-
- long startTime = System.currentTimeMillis();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- try
- {
- slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
- success = (slices.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
-
- if (!success)
- {
- System.err.printf("Thread [%d] retried %d times - error on calling get_range_slices for range %s->%s %s%n",
- index,
- session.getRetryTimes(),
- new String(start),
- new String(end),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
+ byte[] start = String.format(format, index).getBytes();
- if (!session.ignoreErrors())
- return;
- }
-
- session.operationCount.getAndIncrement(index);
- session.latencies.getAndAdd(index, System.currentTimeMillis() - startTime);
- }
+ List<KeySlice> slices = new ArrayList<KeySlice>();
+ KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
- current += slices.size() + 1;
- last = current + slices.size() + 1;
- session.keyCount.getAndAdd(index, slices.size());
- }
- }
- else
- {
- ColumnParent parent = new ColumnParent("Standard1");
-
- while (current < limit)
+ for (int i = 0; i < session.getSuperColumns(); i++)
{
- byte[] start = String.format(format, current).getBytes();
- byte[] end = String.format(format, last).getBytes();
-
- List<KeySlice> slices = new ArrayList<KeySlice>();
- KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(end);
+ String superColumnName = "S" + Integer.toString(i);
+ ColumnParent parent = new ColumnParent("Super1").setSuper_column(ByteBuffer.wrap(superColumnName.getBytes()));
long startTime = System.currentTimeMillis();
@@ -122,9 +66,6 @@ public class RangeSlicer extends Operati
for (int t = 0; t < session.getRetryTimes(); t++)
{
- if (success)
- break;
-
try
{
slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
@@ -139,24 +80,62 @@ public class RangeSlicer extends Operati
if (!success)
{
- System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for range %s->%s %s%n",
- index,
- session.getRetryTimes(),
- new String(start),
- new String(end),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
- if (!session.ignoreErrors())
- return;
+ error(String.format("Operation [%d] retried %d times - error on calling get_range_slices for range offset %s %s%n",
+ index,
+ session.getRetryTimes(),
+ new String(start),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
}
- current += slices.size() + 1;
- last = current + slices.size() + 1;
+ session.operations.getAndIncrement();
+ session.latency.getAndAdd(System.currentTimeMillis() - startTime);
+ }
+
+ session.keys.getAndAdd(slices.size());
+ }
+ else
+ {
+ ColumnParent parent = new ColumnParent("Standard1");
+
+ byte[] start = String.format(format, index).getBytes();
+
+ List<KeySlice> slices = new ArrayList<KeySlice>();
+ KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+
+ long startTime = System.currentTimeMillis();
+
+ boolean success = false;
+ String exceptionMessage = null;
- session.operationCount.getAndIncrement(index);
- session.keyCount.getAndAdd(index, slices.size());
- session.latencies.getAndAdd(index, System.currentTimeMillis() - startTime);
+ for (int t = 0; t < session.getRetryTimes(); t++)
+ {
+ if (success)
+ break;
+
+ try
+ {
+ slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+ success = (slices.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
+ }
}
+
+ if (!success)
+ {
+ error(String.format("Operation [%d] retried %d times - error on calling get_indexed_slices for range offset %s %s%n",
+ index,
+ session.getRetryTimes(),
+ new String(start),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+ }
+
+ session.operations.getAndIncrement();
+ session.keys.getAndAdd(slices.size());
+ session.latency.getAndAdd(System.currentTimeMillis() - startTime);
}
}
}
Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java?rev=1073894&r1=1073893&r2=1073894&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java Wed Feb 23 19:25:40 2011
@@ -17,25 +17,22 @@
*/
package org.apache.cassandra.contrib.stress.operations;
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
import java.io.IOException;
-import java.lang.AssertionError;
import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
import java.util.List;
-public class Reader extends OperationThread
+public class Reader extends Operation
{
public Reader(int index)
{
super(index);
}
- public void run()
+ public void run(Cassandra.Client client) throws IOException
{
SliceRange sliceRange = new SliceRange();
@@ -50,75 +47,23 @@ public class Reader extends OperationThr
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
{
- runSuperColumnReader(predicate);
+ runSuperColumnReader(predicate, client);
}
else
{
- runColumnReader(predicate);
+ runColumnReader(predicate, client);
}
}
- private void runSuperColumnReader(SlicePredicate predicate)
+ private void runSuperColumnReader(SlicePredicate predicate, Cassandra.Client client) throws IOException
{
- for (int i = 0; i < session.getKeysPerThread(); i++)
- {
- byte[] rawKey = generateKey();
- ByteBuffer key = ByteBuffer.wrap(rawKey);
-
- for (int j = 0; j < session.getSuperColumns(); j++)
- {
- String superColumn = 'S' + Integer.toString(j);
- ColumnParent parent = new ColumnParent("Super1").setSuper_column(superColumn.getBytes());
-
- long start = System.currentTimeMillis();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
-
- try
- {
- List<ColumnOrSuperColumn> columns;
- columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
- success = (columns.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
-
- if (!success)
- {
- System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
- session.getRetryTimes(),
- new String(rawKey),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
- if (!session.ignoreErrors())
- return;
- }
-
- session.operationCount.getAndIncrement(index);
- session.keyCount.getAndIncrement(index);
- session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
- }
- }
- }
-
- private void runColumnReader(SlicePredicate predicate)
- {
- ColumnParent parent = new ColumnParent("Standard1");
+ byte[] rawKey = generateKey();
+ ByteBuffer key = ByteBuffer.wrap(rawKey);
- for (int i = 0; i < session.getKeysPerThread(); i++)
+ for (int j = 0; j < session.getSuperColumns(); j++)
{
- byte[] key = generateKey();
- ByteBuffer keyBuffer = ByteBuffer.wrap(key);
+ String superColumn = 'S' + Integer.toString(j);
+ ColumnParent parent = new ColumnParent("Super1").setSuper_column(superColumn.getBytes());
long start = System.currentTimeMillis();
@@ -133,7 +78,7 @@ public class Reader extends OperationThr
try
{
List<ColumnOrSuperColumn> columns;
- columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+ columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
success = (columns.size() != 0);
}
catch (Exception e)
@@ -145,18 +90,61 @@ public class Reader extends OperationThr
if (!success)
{
- System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
- session.getRetryTimes(),
- new String(key),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
+ error(String.format("Operation [%d] retried %d times - error reading key %s %s%n",
+ index,
+ session.getRetryTimes(),
+ new String(rawKey),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+ }
+
+ session.operations.getAndIncrement();
+ session.keys.getAndIncrement();
+ session.latency.getAndAdd(System.currentTimeMillis() - start);
+ }
+ }
+
+ private void runColumnReader(SlicePredicate predicate, Cassandra.Client client) throws IOException
+ {
+ ColumnParent parent = new ColumnParent("Standard1");
+
+ byte[] key = generateKey();
+ ByteBuffer keyBuffer = ByteBuffer.wrap(key);
+
+ long start = System.currentTimeMillis();
+
+ boolean success = false;
+ String exceptionMessage = null;
- if (!session.ignoreErrors())
- return;
+ for (int t = 0; t < session.getRetryTimes(); t++)
+ {
+ if (success)
+ break;
+
+ try
+ {
+ List<ColumnOrSuperColumn> columns;
+ columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+ success = (columns.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
+ }
- session.operationCount.getAndIncrement(index);
- session.keyCount.getAndIncrement(index);
- session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
+ if (!success)
+ {
+ error(String.format("Operation [%d] retried %d times - error reading key %s %s%n",
+ index,
+ session.getRetryTimes(),
+ new String(key),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
}
+
+ session.operations.getAndIncrement();
+ session.keys.getAndIncrement();
+ session.latency.getAndAdd(System.currentTimeMillis() - start);
}
+
}