You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/02/17 23:13:41 UTC
svn commit: r1071811 - in
/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress:
./ operations/
Author: brandonwilliams
Date: Thu Feb 17 22:13:40 2011
New Revision: 1071811
URL: http://svn.apache.org/viewvc?rev=1071811&view=rev
Log:
Add 'keep trying' behavior to stress.java.
Patch by Pavel Yaskevich, reviewed by brandonwilliams for CASSANDRA-2047
Modified:
cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java
Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java?rev=1071811&r1=1071810&r2=1071811&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java Thu Feb 17 22:13:40 2011
@@ -61,7 +61,8 @@ public class Session
availableOptions.addOption("o", "operation", true, "Operation to perform (INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET), default:INSERT");
availableOptions.addOption("u", "supercolumns", true, "Number of super columns per key, default:1");
availableOptions.addOption("y", "family-type", true, "Column Family Type (Super, Standard), default:Standard");
- availableOptions.addOption("k", "keep-going", false, "Ignore errors inserting or reading, default:false");
+ availableOptions.addOption("K", "keep-trying", true, "Retry on-going operation N times (in case of failure). positive integer, default:10");
+ availableOptions.addOption("k", "keep-going", false, "Ignore errors inserting or reading (when set, --keep-trying has no effect), default:false");
availableOptions.addOption("i", "progress-interval", true, "Progress Report Interval (seconds), default:10");
availableOptions.addOption("g", "keys-per-call", true, "Number of keys to get_range_slices or multiget per call, default:1000");
availableOptions.addOption("l", "replication-factor", true, "Replication Factor to use when creating needed column families, default:1");
@@ -80,13 +81,14 @@ public class Session
private String[] nodes = new String[] { "127.0.0.1" };
private boolean random = false;
private boolean unframed = false;
- private boolean ignoreErrors = false;
+ private int retryTimes = 10;
private int port = 9160;
private int superColumns = 1;
private int progressInterval = 10;
private int keysPerCall = 1000;
private int replicationFactor = 1;
+ private boolean ignoreErrors = false;
private PrintStream out = System.out;
@@ -97,6 +99,7 @@ public class Session
private String replicationStrategy = "org.apache.cassandra.locator.SimpleStrategy";
private Map<String, String> replicationStrategyOptions = new HashMap<String, String>();
+
// required by Gaussian distribution.
protected int mean;
protected float sigma;
@@ -188,8 +191,21 @@ public class Session
if (cmd.hasOption("y"))
columnFamilyType = ColumnFamilyType.valueOf(cmd.getOptionValue("y"));
+ if (cmd.hasOption("K"))
+ {
+ retryTimes = Integer.valueOf(cmd.getOptionValue("K"));
+
+ if (retryTimes <= 0)
+ {
+ throw new RuntimeException("--keep-trying option value should be > 0");
+ }
+ }
+
if (cmd.hasOption("k"))
+ {
+ retryTimes = 1;
ignoreErrors = true;
+ }
if (cmd.hasOption("i"))
progressInterval = Integer.parseInt(cmd.getOptionValue("i"));
@@ -297,6 +313,11 @@ public class Session
return consistencyLevel;
}
+ public int getRetryTimes()
+ {
+ return retryTimes;
+ }
+
public boolean ignoreErrors()
{
return ignoreErrors;
Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java?rev=1071811&r1=1071810&r2=1071811&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java Thu Feb 17 22:13:40 2011
@@ -63,21 +63,33 @@ public class IndexedRangeSlicer extends
List<KeySlice> results = null;
long start = System.currentTimeMillis();
- try
+ boolean success = false;
+ String exceptionMessage = null;
+
+ for (int t = 0; t < session.getRetryTimes(); t++)
{
- results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
+ if (success)
+ break;
- if (results.size() == 0)
+ try
{
- System.err.printf("No indexed values from offset received: %s%n", startOffset);
-
- if (!session.ignoreErrors())
- break;
+ results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
+ success = (results.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
}
- catch (Exception e)
+
+ if (!success)
{
- System.err.printf("Error on get_indexed_slices call for offset %s - %s%n", startOffset, getExceptionMessage(e));
+ System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n",
+ index,
+ session.getRetryTimes(),
+ startOffset,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
return;
Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java?rev=1071811&r1=1071810&r2=1071811&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java Thu Feb 17 22:13:40 2011
@@ -64,7 +64,8 @@ public class Inserter extends OperationT
for (int i : range)
{
- ByteBuffer key = ByteBuffer.wrap(String.format(format, i).getBytes());
+ String rawKey = String.format(format, i);
+ ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes());
Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
record.put(key, session.getColumnFamilyType() == ColumnFamilyType.Super
@@ -78,23 +79,35 @@ public class Inserter extends OperationT
long start = System.currentTimeMillis();
- try
- {
- client.batch_mutate(record, session.getConsistencyLevel());
- }
- catch (Exception e)
+ boolean success = false;
+ String exceptionMessage = null;
+
+ for (int t = 0; t < session.getRetryTimes(); t++)
{
+ if (success)
+ break;
+
try
{
- System.err.printf("Error while inserting key %s - %s%n", ByteBufferUtil.string(key), getExceptionMessage(e));
+ client.batch_mutate(record, session.getConsistencyLevel());
+ success = true;
}
- catch (CharacterCodingException e1)
+ catch (Exception e)
{
- throw new AssertionError(e1); // keys are valid strings
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
+ }
+
+ if (!success)
+ {
+ System.err.printf("Thread [%d] retried %d times - error inserting key %s %s%n", index,
+ session.getRetryTimes(),
+ rawKey,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
- return;
+ break;
}
session.operationCount.getAndIncrement(index);
Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java?rev=1071811&r1=1071810&r2=1071811&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java Thu Feb 17 22:13:40 2011
@@ -55,21 +55,32 @@ public class MultiGetter extends Operati
long start = System.currentTimeMillis();
- try
+ boolean success = false;
+ String exceptionMessage = null;
+
+ for (int t = 0; t < session.getRetryTimes(); t++)
{
- results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+ if (success)
+ break;
- if (results.size() == 0)
+ try
{
- System.err.printf("Keys %s were not found.%n", keys);
-
- if (!session.ignoreErrors())
- break;
+ results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+ success = (results.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
}
}
- catch (Exception e)
+
+ if (!success)
{
- System.err.printf("Error on multiget_slice call - %s%n", getExceptionMessage(e));
+ System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+ index,
+ session.getRetryTimes(),
+ keys,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
return;
@@ -93,21 +104,33 @@ public class MultiGetter extends Operati
long start = System.currentTimeMillis();
- try
+ boolean success = false;
+ String exceptionMessage = null;
+
+ for (int t = 0; t < session.getRetryTimes(); t++)
{
- results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+ if (success)
+ break;
- if (results.size() == 0)
+ try
{
- System.err.printf("Keys %s were not found.%n", keys);
-
- if (!session.ignoreErrors())
- break;
+ results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+ success = (results.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
}
- catch (Exception e)
+
+ if (!success)
{
- System.err.printf("Error on multiget_slice call - %s%n", getExceptionMessage(e));
+ System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+ index,
+ session.getRetryTimes(),
+ keys,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
return;
Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java?rev=1071811&r1=1071810&r2=1071811&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java Thu Feb 17 22:13:40 2011
@@ -64,21 +64,31 @@ public class RangeSlicer extends Operati
long startTime = System.currentTimeMillis();
- try
- {
- slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+ boolean success = false;
+ String exceptionMessage = null;
- if (slices.size() == 0)
+ for (int t = 0; t < session.getRetryTimes(); t++)
+ {
+ try
{
- System.err.printf("Range %s->%s not found in Super Column %s.%n", new String(start), new String(end), superColumnName);
-
- if (!session.ignoreErrors())
- break;
+ slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+ success = (slices.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
}
- catch (Exception e)
+
+ if (!success)
{
- System.err.printf("Error while reading Super Column %s - %s%n", superColumnName, getExceptionMessage(e));
+ System.err.printf("Thread [%d] retried %d times - error on calling get_range_slices for range %s->%s %s%n",
+ index,
+ session.getRetryTimes(),
+ new String(start),
+ new String(end),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
return;
@@ -107,21 +117,34 @@ public class RangeSlicer extends Operati
long startTime = System.currentTimeMillis();
- try
+ boolean success = false;
+ String exceptionMessage = null;
+
+ for (int t = 0; t < session.getRetryTimes(); t++)
{
- slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+ if (success)
+ break;
- if (slices.size() == 0)
+ try
{
- System.err.printf("Range %s->%s not found.%n", String.format(format, current), String.format(format, last));
-
- if (!session.ignoreErrors())
- break;
+ slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+ success = (slices.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
}
- catch (Exception e)
+
+ if (!success)
{
- System.err.printf("Error while reading range %s->%s - %s%n", String.format(format, current), String.format(format, last), getExceptionMessage(e));
+ System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for range %s->%s %s%n",
+ index,
+ session.getRetryTimes(),
+ new String(start),
+ new String(end),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
return;
Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java?rev=1071811&r1=1071810&r2=1071811&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java Thu Feb 17 22:13:40 2011
@@ -22,6 +22,7 @@ import org.apache.cassandra.db.ColumnFam
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
+import java.io.IOException;
import java.lang.AssertionError;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
@@ -61,7 +62,8 @@ public class Reader extends OperationThr
{
for (int i = 0; i < session.getKeysPerThread(); i++)
{
- ByteBuffer key = ByteBuffer.wrap(generateKey());
+ byte[] rawKey = generateKey();
+ ByteBuffer key = ByteBuffer.wrap(rawKey);
for (int j = 0; j < session.getSuperColumns(); j++)
{
@@ -70,32 +72,36 @@ public class Reader extends OperationThr
long start = System.currentTimeMillis();
- try
- {
- List<ColumnOrSuperColumn> columns;
- columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
+ boolean success = false;
+ String exceptionMessage = null;
- if (columns.size() == 0)
- {
- System.err.printf("Key %s not found in Super Column %s.%n", ByteBufferUtil.string(key), superColumn);
-
- if (!session.ignoreErrors())
- break;
- }
- }
- catch (Exception e)
+ for (int t = 0; t < session.getRetryTimes(); t++)
{
+ if (success)
+ break;
+
try
{
- System.err.printf("Error while reading Super Column %s key %s - %s%n", superColumn, ByteBufferUtil.string(key), getExceptionMessage(e));
+ List<ColumnOrSuperColumn> columns;
+ columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
+ success = (columns.size() != 0);
}
- catch (CharacterCodingException e1)
+ catch (Exception e)
{
- throw new AssertionError(e1); // keys are valid string
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
+ }
+
+ if (!success)
+ {
+ System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
+ session.getRetryTimes(),
+ new String(rawKey),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
- break;
+ return;
}
session.operationCount.getAndIncrement(index);
@@ -116,25 +122,36 @@ public class Reader extends OperationThr
long start = System.currentTimeMillis();
- try
+ boolean success = false;
+ String exceptionMessage = null;
+
+ for (int t = 0; t < session.getRetryTimes(); t++)
{
- List<ColumnOrSuperColumn> columns;
- columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+ if (success)
+ break;
- if (columns.size() == 0)
+ try
{
- System.err.println(String.format("Key %s not found.", new String(key)));
-
- if (!session.ignoreErrors())
- break;
+ List<ColumnOrSuperColumn> columns;
+ columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+ success = (columns.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
}
- catch (Exception e)
+
+ if (!success)
{
- System.err.printf("Error while reading key %s - %s%n", new String(key), getExceptionMessage(e));
+ System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
+ session.getRetryTimes(),
+ new String(key),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
- break;
+ return;
}
session.operationCount.getAndIncrement(index);