You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/18 23:52:23 UTC
svn commit: r1072184 - in /cassandra/trunk: ./ conf/ contrib/
contrib/py_stress/ contrib/stress/src/org/apache/cassandra/contrib/stress/
contrib/stress/src/org/apache/cassandra/contrib/stress/operations/
interface/thrift/gen-java/org/apache/cassandra/t...
Author: jbellis
Date: Fri Feb 18 22:52:22 2011
New Revision: 1072184
URL: http://svn.apache.org/viewvc?rev=1072184&view=rev
Log:
merge from 0.7
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/contrib/py_stress/stress.py
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/cli/CliMain.java
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 18 22:52:22 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071070
-/cassandra/branches/cassandra-0.7:1026516-1071683,1071867-1071868
+/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777
+/cassandra/branches/cassandra-0.7:1026516-1071868
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Feb 18 22:52:22 2011
@@ -17,7 +17,13 @@
* fixes for cache save/load (CASSANDRA-2172, -2174)
* Handle whole-row deletions in CFOutputFormat (CASSANDRA-2014)
* Make memtable_flush_writers flush in parallel (CASSANDRA-2178)
+ * make key cache preheating default to false; enable with
+ -Dcompaction_preheat_key_cache=true (CASSANDRA-2175)
+ * refactor stress.py to have only one copy of the format string
+ used for creating row keys (CASSANDRA-2108)
+ * validate index names for \w+ (CASSANDRA-2196)
* Fix Cassandra cli to respect timeout if schema does not settle (CASSANDRA-2187)
+ * update memtable_throughput to be a long (CASSANDRA-2158)
0.7.2
@@ -92,6 +98,7 @@
* bound hints CF throughput between 32M and 256M (CASSANDRA-2148)
* continue starting when invalid saved cache entries are encountered
(CASSANDRA-2076)
+ * add max_hint_window_in_ms option (CASSANDRA-1459)
0.7.0-final
Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Fri Feb 18 22:52:22 2011
@@ -231,6 +231,11 @@ column_index_size_in_kb: 64
# will be logged specifying the row key.
in_memory_compaction_limit_in_mb: 64
+# Track cached row keys during compaction, and re-cache their new
+# positions in the compacted sstable. Disable if you use really large
+# key caches.
+compaction_preheat_key_cache: true
+
# Time to wait for a reply from other nodes before failing the command
rpc_timeout_in_ms: 10000
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 18 22:52:22 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1071683,1071867-1071868
+/cassandra/branches/cassandra-0.7/contrib:1026516-1071868
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573
Modified: cassandra/trunk/contrib/py_stress/stress.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/py_stress/stress.py?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/contrib/py_stress/stress.py (original)
+++ cassandra/trunk/contrib/py_stress/stress.py Fri Feb 18 22:52:22 2011
@@ -127,6 +127,9 @@ if options.nodefile != None:
with open(options.nodefile) as f:
nodes = [n.strip() for n in f.readlines() if len(n.strip()) > 0]
+#format string for keys
+fmt = '%0' + str(len(str(total_keys))) + 'd'
+
# a generator that generates all keys according to a bell curve centered
# around the middle of the keys generated (0..total_keys). Remember that
# about 68% of keys will be within stdev away from the mean and
@@ -148,7 +151,6 @@ def generate_values():
return values
def key_generator_gauss():
- fmt = '%0' + str(len(str(total_keys))) + 'd'
while True:
guess = gauss(mean, stdev)
if 0 <= guess < total_keys:
@@ -157,7 +159,6 @@ def key_generator_gauss():
# a generator that will generate all keys w/ equal probability. this is the
# worst case for caching.
def key_generator_random():
- fmt = '%0' + str(len(str(total_keys))) + 'd'
return fmt % randint(0, total_keys - 1)
key_generator = key_generator_gauss
@@ -220,7 +221,6 @@ class Inserter(Operation):
def run(self):
values = generate_values()
columns = [Column('C' + str(j), 'unset', time.time() * 1000000) for j in xrange(columns_per_key)]
- fmt = '%0' + str(len(str(total_keys))) + 'd'
if 'super' == options.cftype:
supers = [SuperColumn('S' + str(j), columns) for j in xrange(supers_per_key)]
for i in self.range:
@@ -295,7 +295,6 @@ class RangeSlicer(Operation):
end = self.range[-1]
current = begin
last = current + options.rangecount
- fmt = '%0' + str(len(str(total_keys))) + 'd'
p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
if 'super' == options.cftype:
while current < end:
@@ -348,7 +347,6 @@ class RangeSlicer(Operation):
# from the thread's appointed range
class IndexedRangeSlicer(Operation):
def run(self):
- fmt = '%0' + str(len(str(total_keys))) + 'd'
p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
values = generate_values()
parent = ColumnParent('Standard1')
Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java Fri Feb 18 22:52:22 2011
@@ -19,9 +19,7 @@ package org.apache.cassandra.contrib.str
import java.io.*;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLongArray;
@@ -29,6 +27,7 @@ import org.apache.commons.cli.*;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
+import org.apache.commons.lang.StringUtils;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
@@ -62,12 +61,15 @@ public class Session
availableOptions.addOption("o", "operation", true, "Operation to perform (INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET), default:INSERT");
availableOptions.addOption("u", "supercolumns", true, "Number of super columns per key, default:1");
availableOptions.addOption("y", "family-type", true, "Column Family Type (Super, Standard), default:Standard");
- availableOptions.addOption("k", "keep-going", false, "Ignore errors inserting or reading, default:false");
+ availableOptions.addOption("K", "keep-trying", true, "Retry on-going operation N times (in case of failure). positive integer, default:10");
+ availableOptions.addOption("k", "keep-going", false, "Ignore errors inserting or reading (when set, --keep-trying has no effect), default:false");
availableOptions.addOption("i", "progress-interval", true, "Progress Report Interval (seconds), default:10");
availableOptions.addOption("g", "keys-per-call", true, "Number of keys to get_range_slices or multiget per call, default:1000");
availableOptions.addOption("l", "replication-factor", true, "Replication Factor to use when creating needed column families, default:1");
availableOptions.addOption("e", "consistency-level", true, "Consistency Level to use (ONE, QUORUM, LOCAL_QUORUM, EACH_QUORUM, ALL, ANY), default:ONE");
availableOptions.addOption("x", "create-index", true, "Type of index to create on needed column families (KEYS)");
+ availableOptions.addOption("R", "replication-strategy", true, "Replication strategy to use (only on insert if keyspace does not exist), default:org.apache.cassandra.locator.SimpleStrategy");
+ availableOptions.addOption("O", "strategy-properties", true, "Replication strategy properties in the following format <dc_name>:<num>,<dc_name>:<num>,...");
}
private int numKeys = 1000 * 1000;
@@ -79,13 +81,14 @@ public class Session
private String[] nodes = new String[] { "127.0.0.1" };
private boolean random = false;
private boolean unframed = false;
- private boolean ignoreErrors = false;
+ private int retryTimes = 10;
private int port = 9160;
private int superColumns = 1;
private int progressInterval = 10;
private int keysPerCall = 1000;
private int replicationFactor = 1;
+ private boolean ignoreErrors = false;
private PrintStream out = System.out;
@@ -93,6 +96,9 @@ public class Session
private Stress.Operation operation = Stress.Operation.INSERT;
private ColumnFamilyType columnFamilyType = ColumnFamilyType.Standard;
private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
+ private String replicationStrategy = "org.apache.cassandra.locator.SimpleStrategy";
+ private Map<String, String> replicationStrategyOptions = new HashMap<String, String>();
+
// required by Gaussian distribution.
protected int mean;
@@ -185,8 +191,21 @@ public class Session
if (cmd.hasOption("y"))
columnFamilyType = ColumnFamilyType.valueOf(cmd.getOptionValue("y"));
+ if (cmd.hasOption("K"))
+ {
+ retryTimes = Integer.valueOf(cmd.getOptionValue("K"));
+
+ if (retryTimes <= 0)
+ {
+ throw new RuntimeException("--keep-trying option value should be > 0");
+ }
+ }
+
if (cmd.hasOption("k"))
+ {
+ retryTimes = 1;
ignoreErrors = true;
+ }
if (cmd.hasOption("i"))
progressInterval = Integer.parseInt(cmd.getOptionValue("i"));
@@ -202,6 +221,24 @@ public class Session
if (cmd.hasOption("x"))
indexType = IndexType.valueOf(cmd.getOptionValue("x").toUpperCase());
+
+ if (cmd.hasOption("R"))
+ replicationStrategy = cmd.getOptionValue("R");
+
+ if (cmd.hasOption("O"))
+ {
+ String[] pairs = StringUtils.split(cmd.getOptionValue("O"), ',');
+
+ for (String pair : pairs)
+ {
+ String[] keyAndValue = StringUtils.split(pair, ':');
+
+ if (keyAndValue.length != 2)
+ throw new RuntimeException("Invalid --strategy-properties value.");
+
+ replicationStrategyOptions.put(keyAndValue[0], keyAndValue[1]);
+ }
+ }
}
catch (ParseException e)
{
@@ -276,6 +313,11 @@ public class Session
return consistencyLevel;
}
+ public int getRetryTimes()
+ {
+ return retryTimes;
+ }
+
public boolean ignoreErrors()
{
return ignoreErrors;
@@ -337,8 +379,14 @@ public class Session
CfDef superCfDef = new CfDef("Keyspace1", "Super1").setColumn_metadata(Arrays.asList(superSubColumn)).setColumn_type("Super");
keyspace.setName("Keyspace1");
- keyspace.setStrategy_class("org.apache.cassandra.locator.SimpleStrategy");
+ keyspace.setStrategy_class(replicationStrategy);
keyspace.setReplication_factor(replicationFactor);
+
+ if (!replicationStrategyOptions.isEmpty())
+ {
+ keyspace.setStrategy_options(replicationStrategyOptions);
+ }
+
keyspace.setCf_defs(new ArrayList<CfDef>(Arrays.asList(standardCfDef, superCfDef)));
Cassandra.Client client = getClient(false);
Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java Fri Feb 18 22:52:22 2011
@@ -63,21 +63,33 @@ public class IndexedRangeSlicer extends
List<KeySlice> results = null;
long start = System.currentTimeMillis();
- try
+ boolean success = false;
+ String exceptionMessage = null;
+
+ for (int t = 0; t < session.getRetryTimes(); t++)
{
- results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
+ if (success)
+ break;
- if (results.size() == 0)
+ try
{
- System.err.printf("No indexed values from offset received: %s%n", startOffset);
-
- if (!session.ignoreErrors())
- break;
+ results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
+ success = (results.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
}
- catch (Exception e)
+
+ if (!success)
{
- System.err.printf("Error on get_indexed_slices call for offset %s - %s%n", startOffset, getExceptionMessage(e));
+ System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n",
+ index,
+ session.getRetryTimes(),
+ startOffset,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
return;
Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java Fri Feb 18 22:52:22 2011
@@ -64,7 +64,8 @@ public class Inserter extends OperationT
for (int i : range)
{
- ByteBuffer key = ByteBuffer.wrap(String.format(format, i).getBytes());
+ String rawKey = String.format(format, i);
+ ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes());
Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
record.put(key, session.getColumnFamilyType() == ColumnFamilyType.Super
@@ -78,23 +79,35 @@ public class Inserter extends OperationT
long start = System.currentTimeMillis();
- try
- {
- client.batch_mutate(record, session.getConsistencyLevel());
- }
- catch (Exception e)
+ boolean success = false;
+ String exceptionMessage = null;
+
+ for (int t = 0; t < session.getRetryTimes(); t++)
{
+ if (success)
+ break;
+
try
{
- System.err.printf("Error while inserting key %s - %s%n", ByteBufferUtil.string(key), getExceptionMessage(e));
+ client.batch_mutate(record, session.getConsistencyLevel());
+ success = true;
}
- catch (CharacterCodingException e1)
+ catch (Exception e)
{
- throw new AssertionError(e1); // keys are valid strings
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
+ }
+
+ if (!success)
+ {
+ System.err.printf("Thread [%d] retried %d times - error inserting key %s %s%n", index,
+ session.getRetryTimes(),
+ rawKey,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
- return;
+ break;
}
session.operationCount.getAndIncrement(index);
Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java Fri Feb 18 22:52:22 2011
@@ -55,21 +55,32 @@ public class MultiGetter extends Operati
long start = System.currentTimeMillis();
- try
+ boolean success = false;
+ String exceptionMessage = null;
+
+ for (int t = 0; t < session.getRetryTimes(); t++)
{
- results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+ if (success)
+ break;
- if (results.size() == 0)
+ try
{
- System.err.printf("Keys %s were not found.%n", keys);
-
- if (!session.ignoreErrors())
- break;
+ results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+ success = (results.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
}
}
- catch (Exception e)
+
+ if (!success)
{
- System.err.printf("Error on multiget_slice call - %s%n", getExceptionMessage(e));
+ System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+ index,
+ session.getRetryTimes(),
+ keys,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
return;
@@ -93,21 +104,33 @@ public class MultiGetter extends Operati
long start = System.currentTimeMillis();
- try
+ boolean success = false;
+ String exceptionMessage = null;
+
+ for (int t = 0; t < session.getRetryTimes(); t++)
{
- results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+ if (success)
+ break;
- if (results.size() == 0)
+ try
{
- System.err.printf("Keys %s were not found.%n", keys);
-
- if (!session.ignoreErrors())
- break;
+ results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+ success = (results.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
}
- catch (Exception e)
+
+ if (!success)
{
- System.err.printf("Error on multiget_slice call - %s%n", getExceptionMessage(e));
+ System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+ index,
+ session.getRetryTimes(),
+ keys,
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
return;
Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java Fri Feb 18 22:52:22 2011
@@ -64,21 +64,31 @@ public class RangeSlicer extends Operati
long startTime = System.currentTimeMillis();
- try
- {
- slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+ boolean success = false;
+ String exceptionMessage = null;
- if (slices.size() == 0)
+ for (int t = 0; t < session.getRetryTimes(); t++)
+ {
+ try
{
- System.err.printf("Range %s->%s not found in Super Column %s.%n", new String(start), new String(end), superColumnName);
-
- if (!session.ignoreErrors())
- break;
+ slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+ success = (slices.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
}
- catch (Exception e)
+
+ if (!success)
{
- System.err.printf("Error while reading Super Column %s - %s%n", superColumnName, getExceptionMessage(e));
+ System.err.printf("Thread [%d] retried %d times - error on calling get_range_slices for range %s->%s %s%n",
+ index,
+ session.getRetryTimes(),
+ new String(start),
+ new String(end),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
return;
@@ -107,21 +117,34 @@ public class RangeSlicer extends Operati
long startTime = System.currentTimeMillis();
- try
+ boolean success = false;
+ String exceptionMessage = null;
+
+ for (int t = 0; t < session.getRetryTimes(); t++)
{
- slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+ if (success)
+ break;
- if (slices.size() == 0)
+ try
{
- System.err.printf("Range %s->%s not found.%n", String.format(format, current), String.format(format, last));
-
- if (!session.ignoreErrors())
- break;
+ slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+ success = (slices.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
}
- catch (Exception e)
+
+ if (!success)
{
- System.err.printf("Error while reading range %s->%s - %s%n", String.format(format, current), String.format(format, last), getExceptionMessage(e));
+ System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for range %s->%s %s%n",
+ index,
+ session.getRetryTimes(),
+ new String(start),
+ new String(end),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
return;
Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java Fri Feb 18 22:52:22 2011
@@ -22,6 +22,7 @@ import org.apache.cassandra.db.ColumnFam
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
+import java.io.IOException;
import java.lang.AssertionError;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
@@ -61,7 +62,8 @@ public class Reader extends OperationThr
{
for (int i = 0; i < session.getKeysPerThread(); i++)
{
- ByteBuffer key = ByteBuffer.wrap(generateKey());
+ byte[] rawKey = generateKey();
+ ByteBuffer key = ByteBuffer.wrap(rawKey);
for (int j = 0; j < session.getSuperColumns(); j++)
{
@@ -70,32 +72,36 @@ public class Reader extends OperationThr
long start = System.currentTimeMillis();
- try
- {
- List<ColumnOrSuperColumn> columns;
- columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
+ boolean success = false;
+ String exceptionMessage = null;
- if (columns.size() == 0)
- {
- System.err.printf("Key %s not found in Super Column %s.%n", ByteBufferUtil.string(key), superColumn);
-
- if (!session.ignoreErrors())
- break;
- }
- }
- catch (Exception e)
+ for (int t = 0; t < session.getRetryTimes(); t++)
{
+ if (success)
+ break;
+
try
{
- System.err.printf("Error while reading Super Column %s key %s - %s%n", superColumn, ByteBufferUtil.string(key), getExceptionMessage(e));
+ List<ColumnOrSuperColumn> columns;
+ columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
+ success = (columns.size() != 0);
}
- catch (CharacterCodingException e1)
+ catch (Exception e)
{
- throw new AssertionError(e1); // keys are valid string
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
+ }
+
+ if (!success)
+ {
+ System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
+ session.getRetryTimes(),
+ new String(rawKey),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
- break;
+ return;
}
session.operationCount.getAndIncrement(index);
@@ -116,25 +122,36 @@ public class Reader extends OperationThr
long start = System.currentTimeMillis();
- try
+ boolean success = false;
+ String exceptionMessage = null;
+
+ for (int t = 0; t < session.getRetryTimes(); t++)
{
- List<ColumnOrSuperColumn> columns;
- columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+ if (success)
+ break;
- if (columns.size() == 0)
+ try
{
- System.err.println(String.format("Key %s not found.", new String(key)));
-
- if (!session.ignoreErrors())
- break;
+ List<ColumnOrSuperColumn> columns;
+ columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+ success = (columns.size() != 0);
+ }
+ catch (Exception e)
+ {
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
}
}
- catch (Exception e)
+
+ if (!success)
{
- System.err.printf("Error while reading key %s - %s%n", new String(key), getExceptionMessage(e));
+ System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
+ session.getRetryTimes(),
+ new String(key),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
if (!session.ignoreErrors())
- break;
+ return;
}
session.operationCount.getAndIncrement(index);
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 18 22:52:22 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071070
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1071683,1071867-1071868
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1071868
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 18 22:52:22 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071070
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1071683,1071867-1071868
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1071868
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 18 22:52:22 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071070
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1071683,1071867-1071868
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1071868
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 18 22:52:22 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071070
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1071683,1071867-1071868
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1071868
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 18 22:52:22 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071070
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1071683,1071867-1071868
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1071868
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliMain.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliMain.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliMain.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliMain.java Fri Feb 18 22:52:22 2011
@@ -90,32 +90,6 @@ public class CliMain
thriftClient = cassandraClient;
cliClient = new CliClient(sessionState, thriftClient);
- if (sessionState.keyspace != null)
- {
- try
- {
- sessionState.keyspace = CliCompiler.getKeySpace(sessionState.keyspace, thriftClient.describe_keyspaces());;
- thriftClient.set_keyspace(sessionState.keyspace);
- cliClient.setKeySpace(sessionState.keyspace);
- updateCompletor(CliUtils.getCfNamesByKeySpace(cliClient.getKSMetaData(sessionState.keyspace)));
- }
- catch (InvalidRequestException e)
- {
- sessionState.err.println("Keyspace " + sessionState.keyspace + " not found");
- return;
- }
- catch (TException e)
- {
- sessionState.err.println("Did you specify 'keyspace'?");
- return;
- }
- catch (NotFoundException e)
- {
- sessionState.err.println("Keyspace " + sessionState.keyspace + " not found");
- return;
- }
- }
-
if ((sessionState.username != null) && (sessionState.password != null))
{
// Authenticate
@@ -149,6 +123,32 @@ public class CliMain
}
}
+ if (sessionState.keyspace != null)
+ {
+ try
+ {
+ sessionState.keyspace = CliCompiler.getKeySpace(sessionState.keyspace, thriftClient.describe_keyspaces());;
+ thriftClient.set_keyspace(sessionState.keyspace);
+ cliClient.setKeySpace(sessionState.keyspace);
+ updateCompletor(CliUtils.getCfNamesByKeySpace(cliClient.getKSMetaData(sessionState.keyspace)));
+ }
+ catch (InvalidRequestException e)
+ {
+ sessionState.err.println("Keyspace " + sessionState.keyspace + " not found");
+ return;
+ }
+ catch (TException e)
+ {
+ sessionState.err.println("Did you specify 'keyspace'?");
+ return;
+ }
+ catch (NotFoundException e)
+ {
+ sessionState.err.println("Keyspace " + sessionState.keyspace + " not found");
+ return;
+ }
+ }
+
// Lookup the cluster name, this is to make it clear which cluster the user is connected to
String clusterName;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Fri Feb 18 22:52:22 2011
@@ -110,6 +110,7 @@ public class Config
public Double reduce_cache_sizes_at = 1.0;
public double reduce_cache_capacity_to = 0.6;
public int hinted_handoff_throttle_delay_in_ms = 0;
+ public boolean compaction_preheat_key_cache = true;
public static enum CommitLogSync {
periodic,
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Feb 18 22:52:22 2011
@@ -1196,4 +1196,9 @@ public class DatabaseDescriptor
{
return conf.hinted_handoff_throttle_delay_in_ms;
}
+
+ public static boolean getPreheatKeyCache()
+ {
+ return conf.compaction_preheat_key_cache;
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Fri Feb 18 22:52:22 2011
@@ -431,12 +431,15 @@ public class CompactionManager implement
long position = writer.append(row);
totalkeysWritten++;
- for (SSTableReader sstable : sstables)
+ if (DatabaseDescriptor.getPreheatKeyCache())
{
- if (sstable.getCachedPosition(row.key) != null)
+ for (SSTableReader sstable : sstables)
{
- cachedKeys.put(row.key, position);
- break;
+ if (sstable.getCachedPosition(row.key) != null)
+ {
+ cachedKeys.put(row.key, position);
+ break;
+ }
}
}
}
@@ -448,7 +451,7 @@ public class CompactionManager implement
SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(sstables));
cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable));
- for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet())
+ for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty if preheat is off
ssTable.cacheKey(entry.getKey(), entry.getValue());
submitMinorIfNeeded(cfs);
Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Fri Feb 18 22:52:22 2011
@@ -28,6 +28,7 @@ import java.util.*;
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.IFilter;
@@ -155,4 +156,49 @@ public class StreamingTransferTest exten
assert null != cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test"), new QueryPath("Standard1")));
assert null != cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer1"), new QueryPath("Standard1")));
}
+
+ @Test
+ public void testTransferOfMultipleColumnFamilies() throws Exception
+ {
+ String keyspace = "Keyspace1";
+ IPartitioner p = StorageService.getPartitioner();
+ String[] columnFamilies = new String[] { "Standard1", "Standard2", "Standard3" };
+ List<SSTableReader> ssTableReaders = new ArrayList<SSTableReader>();
+
+ // ranges to transfer
+ List<Range> ranges = new ArrayList<Range>();
+
+ for (String cf : columnFamilies)
+ {
+ Set<String> content = new HashSet<String>();
+
+ content.add("data-" + cf + "-1");
+ content.add("data-" + cf + "-2");
+ content.add("data-" + cf + "-3");
+
+ SSTableUtils.Context context = SSTableUtils.prepare().ks(keyspace).cf(cf);
+
+ ssTableReaders.add(context.write(content));
+ ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("data-" + cf + "-3"))));
+ }
+
+ StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, null);
+ StreamOut.transferSSTables(session, ssTableReaders, ranges);
+
+ session.await();
+
+ for (String cf : columnFamilies)
+ {
+ ColumnFamilyStore store = Table.open(keyspace).getColumnFamilyStore(cf);
+ List<Row> rows = Util.getRangeSlice(store);
+
+ assert rows.size() >= 3;
+
+ for (int i = 0; i < 3; i++)
+ {
+ String expectedKey = "data-" + cf + "-" + (i + 1);
+ assertEquals(p.decorateKey(ByteBufferUtil.bytes(expectedKey)), rows.get(i).key);
+ }
+ }
+ }
}