You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/26 20:42:41 UTC
svn commit: r1151205 - in /cassandra/trunk: ./ interface/
interface/thrift/gen-java/org/apache/cassandra/thrift/ src/avro/
src/java/org/apache/cassandra/cache/ src/java/org/apache/cassandra/cli/
src/java/org/apache/cassandra/config/ src/java/org/apache...
Author: jbellis
Date: Tue Jul 26 18:42:37 2011
New Revision: 1151205
URL: http://svn.apache.org/viewvc?rev=1151205&view=rev
Log:
add row_cache_keys_to_save CF option
patch by Chris Burroughs; reviewed by jbellis for CASSANDRA-1966
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/interface/cassandra.thrift
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
cassandra/trunk/src/avro/internode.genavro
cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java
cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java
cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java
cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java
cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Jul 26 18:42:37 2011
@@ -18,6 +18,7 @@
* store hints as serialized mutations instead of pointers to data rows
* store hints in the coordinator node instead of in the closest
replica (CASSANDRA-2914).
+ * add row_cache_keys_to_save CF option (CASSANDRA-1966)
0.8.2
Modified: cassandra/trunk/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.thrift (original)
+++ cassandra/trunk/interface/cassandra.thrift Tue Jul 26 18:42:37 2011
@@ -46,7 +46,7 @@ namespace rb CassandraThrift
# for every edit that doesn't result in a change to major/minor.
#
# See the Semantic Versioning Specification (SemVer) http://semver.org.
-const string VERSION = "19.10.0"
+const string VERSION = "19.11.0"
#
@@ -396,6 +396,7 @@ struct CfDef {
28: optional binary key_alias,
29: optional string compaction_strategy,
30: optional map<string,string> compaction_strategy_options,
+ 31: optional i32 row_cache_keys_to_save,
}
/* describes a keyspace. */
Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (original)
+++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java Tue Jul 26 18:42:37 2011
@@ -9086,6 +9086,8 @@ public class Cassandra {
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -17041,8 +17043,6 @@ public class Cassandra {
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
- // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -25752,6 +25752,8 @@ public class Cassandra {
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java (original)
+++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java Tue Jul 26 18:42:37 2011
@@ -71,6 +71,7 @@ public class CfDef implements org.apache
private static final org.apache.thrift.protocol.TField KEY_ALIAS_FIELD_DESC = new org.apache.thrift.protocol.TField("key_alias", org.apache.thrift.protocol.TType.STRING, (short)28);
private static final org.apache.thrift.protocol.TField COMPACTION_STRATEGY_FIELD_DESC = new org.apache.thrift.protocol.TField("compaction_strategy", org.apache.thrift.protocol.TType.STRING, (short)29);
private static final org.apache.thrift.protocol.TField COMPACTION_STRATEGY_OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("compaction_strategy_options", org.apache.thrift.protocol.TType.MAP, (short)30);
+ private static final org.apache.thrift.protocol.TField ROW_CACHE_KEYS_TO_SAVE_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_keys_to_save", org.apache.thrift.protocol.TType.I32, (short)31);
public String keyspace;
public String name;
@@ -98,6 +99,7 @@ public class CfDef implements org.apache
public ByteBuffer key_alias;
public String compaction_strategy;
public Map<String,String> compaction_strategy_options;
+ public int row_cache_keys_to_save;
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -126,7 +128,8 @@ public class CfDef implements org.apache
ROW_CACHE_PROVIDER((short)27, "row_cache_provider"),
KEY_ALIAS((short)28, "key_alias"),
COMPACTION_STRATEGY((short)29, "compaction_strategy"),
- COMPACTION_STRATEGY_OPTIONS((short)30, "compaction_strategy_options");
+ COMPACTION_STRATEGY_OPTIONS((short)30, "compaction_strategy_options"),
+ ROW_CACHE_KEYS_TO_SAVE((short)31, "row_cache_keys_to_save");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -193,6 +196,8 @@ public class CfDef implements org.apache
return COMPACTION_STRATEGY;
case 30: // COMPACTION_STRATEGY_OPTIONS
return COMPACTION_STRATEGY_OPTIONS;
+ case 31: // ROW_CACHE_KEYS_TO_SAVE
+ return ROW_CACHE_KEYS_TO_SAVE;
default:
return null;
}
@@ -246,7 +251,8 @@ public class CfDef implements org.apache
private static final int __MEMTABLE_OPERATIONS_IN_MILLIONS_ISSET_ID = 10;
private static final int __REPLICATE_ON_WRITE_ISSET_ID = 11;
private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 12;
- private BitSet __isset_bit_vector = new BitSet(13);
+ private static final int __ROW_CACHE_KEYS_TO_SAVE_ISSET_ID = 13;
+ private BitSet __isset_bit_vector = new BitSet(14);
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
@@ -306,6 +312,8 @@ public class CfDef implements org.apache
new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING),
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+ tmpMap.put(_Fields.ROW_CACHE_KEYS_TO_SAVE, new org.apache.thrift.meta_data.FieldMetaData("row_cache_keys_to_save", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CfDef.class, metaDataMap);
}
@@ -409,6 +417,7 @@ public class CfDef implements org.apache
}
this.compaction_strategy_options = __this__compaction_strategy_options;
}
+ this.row_cache_keys_to_save = other.row_cache_keys_to_save;
}
public CfDef deepCopy() {
@@ -459,6 +468,8 @@ public class CfDef implements org.apache
this.key_alias = null;
this.compaction_strategy = null;
this.compaction_strategy_options = null;
+ setRow_cache_keys_to_saveIsSet(false);
+ this.row_cache_keys_to_save = 0;
}
public String getKeyspace() {
@@ -1108,6 +1119,29 @@ public class CfDef implements org.apache
}
}
+ public int getRow_cache_keys_to_save() {
+ return this.row_cache_keys_to_save;
+ }
+
+ public CfDef setRow_cache_keys_to_save(int row_cache_keys_to_save) {
+ this.row_cache_keys_to_save = row_cache_keys_to_save;
+ setRow_cache_keys_to_saveIsSet(true);
+ return this;
+ }
+
+ public void unsetRow_cache_keys_to_save() {
+ __isset_bit_vector.clear(__ROW_CACHE_KEYS_TO_SAVE_ISSET_ID);
+ }
+
+ /** Returns true if field row_cache_keys_to_save is set (has been assigned a value) and false otherwise */
+ public boolean isSetRow_cache_keys_to_save() {
+ return __isset_bit_vector.get(__ROW_CACHE_KEYS_TO_SAVE_ISSET_ID);
+ }
+
+ public void setRow_cache_keys_to_saveIsSet(boolean value) {
+ __isset_bit_vector.set(__ROW_CACHE_KEYS_TO_SAVE_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case KEYSPACE:
@@ -1318,6 +1352,14 @@ public class CfDef implements org.apache
}
break;
+ case ROW_CACHE_KEYS_TO_SAVE:
+ if (value == null) {
+ unsetRow_cache_keys_to_save();
+ } else {
+ setRow_cache_keys_to_save((Integer)value);
+ }
+ break;
+
}
}
@@ -1401,6 +1443,9 @@ public class CfDef implements org.apache
case COMPACTION_STRATEGY_OPTIONS:
return getCompaction_strategy_options();
+ case ROW_CACHE_KEYS_TO_SAVE:
+ return new Integer(getRow_cache_keys_to_save());
+
}
throw new IllegalStateException();
}
@@ -1464,6 +1509,8 @@ public class CfDef implements org.apache
return isSetCompaction_strategy();
case COMPACTION_STRATEGY_OPTIONS:
return isSetCompaction_strategy_options();
+ case ROW_CACHE_KEYS_TO_SAVE:
+ return isSetRow_cache_keys_to_save();
}
throw new IllegalStateException();
}
@@ -1715,6 +1762,15 @@ public class CfDef implements org.apache
return false;
}
+ boolean this_present_row_cache_keys_to_save = true && this.isSetRow_cache_keys_to_save();
+ boolean that_present_row_cache_keys_to_save = true && that.isSetRow_cache_keys_to_save();
+ if (this_present_row_cache_keys_to_save || that_present_row_cache_keys_to_save) {
+ if (!(this_present_row_cache_keys_to_save && that_present_row_cache_keys_to_save))
+ return false;
+ if (this.row_cache_keys_to_save != that.row_cache_keys_to_save)
+ return false;
+ }
+
return true;
}
@@ -1852,6 +1908,11 @@ public class CfDef implements org.apache
if (present_compaction_strategy_options)
builder.append(compaction_strategy_options);
+ boolean present_row_cache_keys_to_save = true && (isSetRow_cache_keys_to_save());
+ builder.append(present_row_cache_keys_to_save);
+ if (present_row_cache_keys_to_save)
+ builder.append(row_cache_keys_to_save);
+
return builder.toHashCode();
}
@@ -2123,6 +2184,16 @@ public class CfDef implements org.apache
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetRow_cache_keys_to_save()).compareTo(typedOther.isSetRow_cache_keys_to_save());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetRow_cache_keys_to_save()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.row_cache_keys_to_save, typedOther.row_cache_keys_to_save);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -2358,6 +2429,14 @@ public class CfDef implements org.apache
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
}
break;
+ case 31: // ROW_CACHE_KEYS_TO_SAVE
+ if (field.type == org.apache.thrift.protocol.TType.I32) {
+ this.row_cache_keys_to_save = iprot.readI32();
+ setRow_cache_keys_to_saveIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
}
@@ -2540,6 +2619,11 @@ public class CfDef implements org.apache
oprot.writeFieldEnd();
}
}
+ if (isSetRow_cache_keys_to_save()) {
+ oprot.writeFieldBegin(ROW_CACHE_KEYS_TO_SAVE_FIELD_DESC);
+ oprot.writeI32(this.row_cache_keys_to_save);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -2752,6 +2836,12 @@ public class CfDef implements org.apache
}
first = false;
}
+ if (isSetRow_cache_keys_to_save()) {
+ if (!first) sb.append(", ");
+ sb.append("row_cache_keys_to_save:");
+ sb.append(this.row_cache_keys_to_save);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java (original)
+++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java Tue Jul 26 18:42:37 2011
@@ -44,6 +44,6 @@ import org.slf4j.LoggerFactory;
public class Constants {
- public static final String VERSION = "19.10.0";
+ public static final String VERSION = "19.11.0";
}
Modified: cassandra/trunk/src/avro/internode.genavro
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/avro/internode.genavro?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/avro/internode.genavro (original)
+++ cassandra/trunk/src/avro/internode.genavro Tue Jul 26 18:42:37 2011
@@ -57,6 +57,7 @@ protocol InterNode {
union { null, int } max_compaction_threshold = null;
union { int, null } row_cache_save_period_in_seconds = 0;
union { int, null } key_cache_save_period_in_seconds = 3600;
+ union { int, null } row_cache_keys_to_save = null;
union { null, int } memtable_throughput_in_mb = null;
union { null, double} memtable_operations_in_millions = null;
union { null, double} merge_shards_chance = null;
Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java Tue Jul 26 18:42:37 2011
@@ -77,12 +77,12 @@ public abstract class AutoSavingCache<K,
return DatabaseDescriptor.getSerializedCachePath(tableName, cfName, cacheType);
}
- public Writer getWriter()
+ public Writer getWriter(int keysToSave)
{
- return new Writer(tableName, cfName);
+ return new Writer(tableName, cfName, keysToSave);
}
- public void scheduleSaving(int savePeriodInSeconds)
+ public void scheduleSaving(int savePeriodInSeconds, final int keysToSave)
{
if (saveTask != null)
{
@@ -95,7 +95,7 @@ public abstract class AutoSavingCache<K,
{
public void runMayThrow()
{
- submitWrite();
+ submitWrite(keysToSave);
}
};
saveTask = StorageService.tasks.scheduleWithFixedDelay(runnable,
@@ -105,9 +105,9 @@ public abstract class AutoSavingCache<K,
}
}
- public Future<?> submitWrite()
+ public Future<?> submitWrite(int keysToSave)
{
- return CompactionManager.instance.submitCacheWrite(getWriter());
+ return CompactionManager.instance.submitCacheWrite(getWriter(keysToSave));
}
public Set<DecoratedKey> readSaved()
@@ -195,9 +195,12 @@ public abstract class AutoSavingCache<K,
private final long estimatedTotalBytes;
private long bytesWritten;
- private Writer(String ksname, String cfname)
+ private Writer(String ksname, String cfname, int keysToSave)
{
- keys = getKeySet();
+ if (keysToSave >= getKeySet().size())
+ keys = getKeySet();
+ else
+ keys = hotKeySet(keysToSave);
long bytes = 0;
for (K key : keys)
bytes += translateKey(key).remaining();
Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java Tue Jul 26 18:42:37 2011
@@ -96,6 +96,11 @@ public class ConcurrentLinkedHashCache<K
return map.keySet();
}
+ public Set<K> hotKeySet(int n)
+ {
+ return map.descendingKeySetWithLimit(n);
+ }
+
public boolean isPutCopying()
{
return false;
Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java Tue Jul 26 18:42:37 2011
@@ -46,6 +46,8 @@ public interface ICache<K, V>
public Set<K> keySet();
+ public Set<K> hotKeySet(int n);
+
/**
* @return true if the cache implementation inherently copies the cached values; otherwise,
* the caller should copy manually before caching shared values like Thrift ByteBuffers.
Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java Tue Jul 26 18:42:37 2011
@@ -150,6 +150,11 @@ public class InstrumentingCache<K, V> im
return map.keySet();
}
+ public Set<K> hotKeySet(int n)
+ {
+ return map.hotKeySet(n);
+ }
+
public boolean isPutCopying()
{
return map.isPutCopying();
Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java Tue Jul 26 18:42:37 2011
@@ -163,6 +163,11 @@ public class SerializingCache<K, V> impl
return map.keySet();
}
+ public Set<K> hotKeySet(int n)
+ {
+ return map.descendingKeySetWithLimit(n);
+ }
+
public boolean isPutCopying()
{
return true;
Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Tue Jul 26 18:42:37 2011
@@ -119,6 +119,7 @@ public class CliClient
COMMENT,
ROWS_CACHED,
ROW_CACHE_SAVE_PERIOD,
+ ROW_CACHE_KEYS_TO_SAVE,
KEYS_CACHED,
KEY_CACHE_SAVE_PERIOD,
READ_REPAIR_CHANCE,
@@ -1231,6 +1232,9 @@ public class CliClient
case KEY_CACHE_SAVE_PERIOD:
cfDef.setKey_cache_save_period_in_seconds(Integer.parseInt(mValue));
break;
+ case ROW_CACHE_KEYS_TO_SAVE:
+ cfDef.setRow_cache_keys_to_save(Integer.parseInt(mValue));
+ break;
case DEFAULT_VALIDATION_CLASS:
cfDef.setDefault_validation_class(CliUtils.unescapeSQLString(mValue));
break;
@@ -1716,7 +1720,9 @@ public class CliClient
if (cf_def.default_validation_class != null)
sessionState.out.printf(" Default column value validator: %s%n", cf_def.default_validation_class);
sessionState.out.printf(" Columns sorted by: %s%s%n", cf_def.comparator_type, cf_def.column_type.equals("Super") ? "/" + cf_def.subcomparator_type : "");
- sessionState.out.printf(" Row cache size / save period in seconds: %s/%s%n", cf_def.row_cache_size, cf_def.row_cache_save_period_in_seconds);
+ sessionState.out.printf(" Row cache size / save period in seconds / keys to save : %s/%s/%s%n",
+ cf_def.row_cache_size, cf_def.row_cache_save_period_in_seconds,
+ cf_def.row_cache_keys_to_save == Integer.MAX_VALUE ? "all" : cf_def.row_cache_keys_to_save);
sessionState.out.printf(" Key cache size / save period in seconds: %s/%s%n", cf_def.key_cache_size, cf_def.key_cache_save_period_in_seconds);
sessionState.out.printf(" Memtable thresholds: %s/%s (millions of ops/MB)%n",
cf_def.memtable_operations_in_millions, cf_def.memtable_throughput_in_mb);
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Tue Jul 26 18:42:37 2011
@@ -70,6 +70,7 @@ public final class CFMetaData
public final static int DEFAULT_SYSTEM_MEMTABLE_THROUGHPUT_IN_MB = 8;
public final static int DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS = 0;
public final static int DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS = 4 * 3600;
+ public final static int DEFAULT_ROW_CACHE_KEYS_TO_SAVE = Integer.MAX_VALUE;
public final static int DEFAULT_GC_GRACE_SECONDS = 864000;
public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32;
@@ -164,6 +165,7 @@ public final class CFMetaData
private int maxCompactionThreshold; // default 32
private int rowCacheSavePeriodInSeconds; // default 0 (off)
private int keyCacheSavePeriodInSeconds; // default 3600 (1 hour)
+ private int rowCacheKeysToSave; // default max int (aka feature is off)
private int memtableThroughputInMb; // default based on heap size
private double memtableOperationsInMillions; // default based on throughput
private double mergeShardsChance; // default 0.1, chance [0.0, 1.0] of merging old shards during replication
@@ -186,6 +188,7 @@ public final class CFMetaData
public CFMetaData maxCompactionThreshold(int prop) {maxCompactionThreshold = prop; return this;}
public CFMetaData rowCacheSavePeriod(int prop) {rowCacheSavePeriodInSeconds = prop; return this;}
public CFMetaData keyCacheSavePeriod(int prop) {keyCacheSavePeriodInSeconds = prop; return this;}
+ public CFMetaData rowCacheKeysToSave(int prop) {rowCacheKeysToSave = prop; return this;}
public CFMetaData memSize(int prop) {memtableThroughputInMb = prop; return this;}
public CFMetaData memOps(double prop) {memtableOperationsInMillions = prop; return this;}
public CFMetaData mergeShardsChance(double prop) {mergeShardsChance = prop; return this;}
@@ -231,6 +234,7 @@ public final class CFMetaData
// Set a bunch of defaults
rowCacheSize = DEFAULT_ROW_CACHE_SIZE;
keyCacheSize = DEFAULT_KEY_CACHE_SIZE;
+ rowCacheKeysToSave = DEFAULT_ROW_CACHE_KEYS_TO_SAVE;
readRepairChance = DEFAULT_READ_REPAIR_CHANCE;
replicateOnWrite = DEFAULT_REPLICATE_ON_WRITE;
gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS;
@@ -319,6 +323,7 @@ public final class CFMetaData
.maxCompactionThreshold(oldCFMD.maxCompactionThreshold)
.rowCacheSavePeriod(oldCFMD.rowCacheSavePeriodInSeconds)
.keyCacheSavePeriod(oldCFMD.keyCacheSavePeriodInSeconds)
+ .rowCacheKeysToSave(oldCFMD.rowCacheKeysToSave)
.memSize(oldCFMD.memtableThroughputInMb)
.memOps(oldCFMD.memtableOperationsInMillions)
.columnMetadata(oldCFMD.column_metadata)
@@ -368,6 +373,7 @@ public final class CFMetaData
cf.max_compaction_threshold = maxCompactionThreshold;
cf.row_cache_save_period_in_seconds = rowCacheSavePeriodInSeconds;
cf.key_cache_save_period_in_seconds = keyCacheSavePeriodInSeconds;
+ cf.row_cache_keys_to_save = rowCacheKeysToSave;
cf.memtable_throughput_in_mb = memtableThroughputInMb;
cf.memtable_operations_in_millions = memtableOperationsInMillions;
cf.merge_shards_chance = mergeShardsChance;
@@ -430,6 +436,7 @@ public final class CFMetaData
if (cf.max_compaction_threshold != null) { newCFMD.maxCompactionThreshold(cf.max_compaction_threshold); }
if (cf.row_cache_save_period_in_seconds != null) { newCFMD.rowCacheSavePeriod(cf.row_cache_save_period_in_seconds); }
if (cf.key_cache_save_period_in_seconds != null) { newCFMD.keyCacheSavePeriod(cf.key_cache_save_period_in_seconds); }
+ if (cf.row_cache_keys_to_save != null) { newCFMD.rowCacheKeysToSave(cf.row_cache_keys_to_save); }
if (cf.memtable_throughput_in_mb != null) { newCFMD.memSize(cf.memtable_throughput_in_mb); }
if (cf.memtable_operations_in_millions != null) { newCFMD.memOps(cf.memtable_operations_in_millions); }
if (cf.merge_shards_chance != null) { newCFMD.mergeShardsChance(cf.merge_shards_chance); }
@@ -538,6 +545,11 @@ public final class CFMetaData
return keyCacheSavePeriodInSeconds;
}
+ public int getRowCacheKeysToSave()
+ {
+ return rowCacheKeysToSave;
+ }
+
public int getMemtableThroughputInMb()
{
return memtableThroughputInMb;
@@ -600,6 +612,7 @@ public final class CFMetaData
.append(column_metadata, rhs.column_metadata)
.append(rowCacheSavePeriodInSeconds, rhs.rowCacheSavePeriodInSeconds)
.append(keyCacheSavePeriodInSeconds, rhs.keyCacheSavePeriodInSeconds)
+ .append(rowCacheKeysToSave, rhs.rowCacheKeysToSave)
.append(memtableThroughputInMb, rhs.memtableThroughputInMb)
.append(memtableOperationsInMillions, rhs.memtableOperationsInMillions)
.append(mergeShardsChance, rhs.mergeShardsChance)
@@ -631,6 +644,7 @@ public final class CFMetaData
.append(column_metadata)
.append(rowCacheSavePeriodInSeconds)
.append(keyCacheSavePeriodInSeconds)
+ .append(rowCacheKeysToSave)
.append(memtableThroughputInMb)
.append(memtableOperationsInMillions)
.append(mergeShardsChance)
@@ -669,6 +683,8 @@ public final class CFMetaData
cf_def.setRow_cache_save_period_in_seconds(CFMetaData.DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS);
if (!cf_def.isSetKey_cache_save_period_in_seconds())
cf_def.setKey_cache_save_period_in_seconds(CFMetaData.DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS);
+ if (!cf_def.isSetRow_cache_keys_to_save())
+ cf_def.setRow_cache_keys_to_save(CFMetaData.DEFAULT_ROW_CACHE_KEYS_TO_SAVE);
if (!cf_def.isSetMemtable_throughput_in_mb())
cf_def.setMemtable_throughput_in_mb(CFMetaData.DEFAULT_MEMTABLE_THROUGHPUT_IN_MB);
if (!cf_def.isSetMemtable_operations_in_millions())
@@ -704,6 +720,7 @@ public final class CFMetaData
if (cf_def.isSetMax_compaction_threshold()) { newCFMD.maxCompactionThreshold(cf_def.max_compaction_threshold); }
if (cf_def.isSetRow_cache_save_period_in_seconds()) { newCFMD.rowCacheSavePeriod(cf_def.row_cache_save_period_in_seconds); }
if (cf_def.isSetKey_cache_save_period_in_seconds()) { newCFMD.keyCacheSavePeriod(cf_def.key_cache_save_period_in_seconds); }
+ if (cf_def.isSetRow_cache_keys_to_save()) { newCFMD.rowCacheKeysToSave(cf_def.row_cache_keys_to_save); }
if (cf_def.isSetMemtable_throughput_in_mb()) { newCFMD.memSize(cf_def.memtable_throughput_in_mb); }
if (cf_def.isSetMemtable_operations_in_millions()) { newCFMD.memOps(cf_def.memtable_operations_in_millions); }
if (cf_def.isSetMerge_shards_chance()) { newCFMD.mergeShardsChance(cf_def.merge_shards_chance); }
@@ -776,6 +793,7 @@ public final class CFMetaData
maxCompactionThreshold = cf_def.max_compaction_threshold;
rowCacheSavePeriodInSeconds = cf_def.row_cache_save_period_in_seconds;
keyCacheSavePeriodInSeconds = cf_def.key_cache_save_period_in_seconds;
+ rowCacheKeysToSave = cf_def.row_cache_keys_to_save;
memtableThroughputInMb = cf_def.memtable_throughput_in_mb;
memtableOperationsInMillions = cf_def.memtable_operations_in_millions;
mergeShardsChance = cf_def.merge_shards_chance;
@@ -895,6 +913,7 @@ public final class CFMetaData
def.setMax_compaction_threshold(cfm.maxCompactionThreshold);
def.setRow_cache_save_period_in_seconds(cfm.rowCacheSavePeriodInSeconds);
def.setKey_cache_save_period_in_seconds(cfm.keyCacheSavePeriodInSeconds);
+ def.setRow_cache_keys_to_save(cfm.rowCacheKeysToSave);
def.setMemtable_throughput_in_mb(cfm.memtableThroughputInMb);
def.setMemtable_operations_in_millions(cfm.memtableOperationsInMillions);
def.setMerge_shards_chance(cfm.mergeShardsChance);
@@ -941,6 +960,7 @@ public final class CFMetaData
def.max_compaction_threshold = cfm.maxCompactionThreshold;
def.row_cache_save_period_in_seconds = cfm.rowCacheSavePeriodInSeconds;
def.key_cache_save_period_in_seconds = cfm.keyCacheSavePeriodInSeconds;
+ def.row_cache_keys_to_save = cfm.rowCacheKeysToSave;
def.memtable_throughput_in_mb = cfm.memtableThroughputInMb;
def.memtable_operations_in_millions = cfm.memtableOperationsInMillions;
def.merge_shards_chance = cfm.mergeShardsChance;
@@ -986,6 +1006,7 @@ public final class CFMetaData
newDef.read_repair_chance = def.getRead_repair_chance();
newDef.replicate_on_write = def.isReplicate_on_write();
newDef.row_cache_save_period_in_seconds = def.getRow_cache_save_period_in_seconds();
+ newDef.row_cache_keys_to_save = def.getRow_cache_keys_to_save();
newDef.row_cache_size = def.getRow_cache_size();
newDef.subcomparator_type = def.getSubcomparator_type();
newDef.merge_shards_chance = def.getMerge_shards_chance();
@@ -1120,6 +1141,7 @@ public final class CFMetaData
.append("maxCompactionThreshold", maxCompactionThreshold)
.append("rowCacheSavePeriodInSeconds", rowCacheSavePeriodInSeconds)
.append("keyCacheSavePeriodInSeconds", keyCacheSavePeriodInSeconds)
+ .append("rowCacheKeysToSave", rowCacheKeysToSave)
.append("memtableThroughputInMb", memtableThroughputInMb)
.append("memtableOperationsInMillions", memtableOperationsInMillions)
.append("mergeShardsChance", mergeShardsChance)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Jul 26 18:42:37 2011
@@ -142,6 +142,7 @@ public class ColumnFamilyStore implement
private volatile DefaultDouble memops;
private volatile DefaultInteger rowCacheSaveInSeconds;
private volatile DefaultInteger keyCacheSaveInSeconds;
+ private volatile DefaultInteger rowCacheKeysToSave;
/** Lock to allow migrations to block all flushing, so we can be sure not to write orphaned data files */
public final Lock flushLock = new ReentrantLock();
@@ -195,11 +196,13 @@ public class ColumnFamilyStore implement
rowCacheSaveInSeconds = new DefaultInteger(metadata.getRowCacheSavePeriodInSeconds());
if (!keyCacheSaveInSeconds.isModified())
keyCacheSaveInSeconds = new DefaultInteger(metadata.getKeyCacheSavePeriodInSeconds());
+ if (!rowCacheKeysToSave.isModified())
+ rowCacheKeysToSave = new DefaultInteger(metadata.getRowCacheKeysToSave());
compactionStrategy = metadata.createCompactionStrategyInstance(this);
updateCacheSizes();
- scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value());
+ scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value(), rowCacheKeysToSave.value());
// figure out what needs to be added and dropped.
// future: if/when we have modifiable settings for secondary indexes, they'll need to be handled here.
@@ -241,6 +244,7 @@ public class ColumnFamilyStore implement
this.memops = new DefaultDouble(metadata.getMemtableOperationsInMillions());
this.rowCacheSaveInSeconds = new DefaultInteger(metadata.getRowCacheSavePeriodInSeconds());
this.keyCacheSaveInSeconds = new DefaultInteger(metadata.getKeyCacheSavePeriodInSeconds());
+ this.rowCacheKeysToSave = new DefaultInteger(metadata.getRowCacheKeysToSave());
this.partitioner = partitioner;
fileIndexGenerator.set(generation);
@@ -542,13 +546,13 @@ public class ColumnFamilyStore implement
table.name,
columnFamily));
- scheduleCacheSaving(metadata.getRowCacheSavePeriodInSeconds(), metadata.getKeyCacheSavePeriodInSeconds());
+ scheduleCacheSaving(metadata.getRowCacheSavePeriodInSeconds(), metadata.getKeyCacheSavePeriodInSeconds(), metadata.getRowCacheKeysToSave());
}
- public void scheduleCacheSaving(int rowCacheSavePeriodInSeconds, int keyCacheSavePeriodInSeconds)
+ public void scheduleCacheSaving(int rowCacheSavePeriodInSeconds, int keyCacheSavePeriodInSeconds, int rowCacheKeysToSave)
{
- keyCache.scheduleSaving(keyCacheSavePeriodInSeconds);
- rowCache.scheduleSaving(rowCacheSavePeriodInSeconds);
+ keyCache.scheduleSaving(keyCacheSavePeriodInSeconds, Integer.MAX_VALUE);
+ rowCache.scheduleSaving(rowCacheSavePeriodInSeconds, rowCacheKeysToSave);
}
public AutoSavingCache<Pair<Descriptor,DecoratedKey>, Long> getKeyCache()
@@ -1985,6 +1989,7 @@ public class ColumnFamilyStore implement
- get/set memtime
- get/set rowCacheSavePeriodInSeconds
- get/set keyCacheSavePeriodInSeconds
+ - get/set rowCacheKeysToSave
*/
public AbstractCompactionStrategy getCompactionStrategy()
@@ -2056,7 +2061,7 @@ public class ColumnFamilyStore implement
throw new RuntimeException("RowCacheSavePeriodInSeconds must be non-negative.");
}
this.rowCacheSaveInSeconds.set(rcspis);
- scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value());
+ scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value(), rowCacheKeysToSave.value());
}
public int getKeyCacheSavePeriodInSeconds()
@@ -2070,7 +2075,17 @@ public class ColumnFamilyStore implement
throw new RuntimeException("KeyCacheSavePeriodInSeconds must be non-negative.");
}
this.keyCacheSaveInSeconds.set(kcspis);
- scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value());
+ scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value(), rowCacheKeysToSave.value());
+ }
+
+ public int getRowCacheKeysToSave()
+ {
+ return rowCacheKeysToSave.value();
+ }
+
+ public void setRowCacheKeysToSave(int keysToSave)
+ {
+ this.rowCacheKeysToSave.set(keysToSave);
}
// End JMX get/set.
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Tue Jul 26 18:42:37 2011
@@ -231,4 +231,7 @@ public interface ColumnFamilyStoreMBean
public int getKeyCacheSavePeriodInSeconds();
public void setKeyCacheSavePeriodInSeconds(int kcspis);
+
+ public int getRowCacheKeysToSave();
+ public void setRowCacheKeysToSave(int keysToSave);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java Tue Jul 26 18:42:37 2011
@@ -159,6 +159,7 @@ public abstract class Migration
assert !StorageService.instance.isClientMode();
assert column != null;
MigrationManager.announce(column);
+ passiveAnnounce(); // keeps gossip in sync w/ what we just told everyone
}
public final void passiveAnnounce()
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Jul 26 18:42:37 2011
@@ -2216,8 +2216,8 @@ public class StorageService implements I
logger_.debug("submitting cache saves");
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
- futures.add(cfs.keyCache.submitWrite());
- futures.add(cfs.rowCache.submitWrite());
+ futures.add(cfs.keyCache.submitWrite(-1));
+ futures.add(cfs.rowCache.submitWrite(cfs.getRowCacheKeysToSave()));
}
FBUtilities.waitOnFutures(futures);
logger_.debug("cache saves completed");
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java Tue Jul 26 18:42:37 2011
@@ -84,7 +84,7 @@ public class KeyCacheTest extends Cleanu
}
// force the cache to disk
- store.keyCache.submitWrite().get();
+ store.keyCache.submitWrite(Integer.MAX_VALUE).get();
// empty the cache again to make sure values came from disk
store.invalidateKeyCache();
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java Tue Jul 26 18:42:37 2011
@@ -114,6 +114,19 @@ public class RowCacheTest extends Cleanu
@Test
public void testRowCacheLoad() throws Exception
{
+ rowCacheLoad(100, 100, Integer.MAX_VALUE);
+ }
+
+
+ @Test
+ public void testRowCachePartialLoad() throws Exception
+ {
+ rowCacheLoad(100, 50, 50);
+ }
+
+
+ public void rowCacheLoad(int totalKeys, int expectedKeys, int keysToSave) throws Exception
+ {
CompactionManager.instance.disableAutoCompaction();
ColumnFamilyStore store = Table.open(KEYSPACE).getColumnFamilyStore(COLUMN_FAMILY_WITH_CACHE);
@@ -123,12 +136,12 @@ public class RowCacheTest extends Cleanu
assert store.getRowCacheSize() == 0;
// insert data and fill the cache
- insertData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, 100);
- readData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, 100);
- assert store.getRowCacheSize() == 100;
+ insertData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, totalKeys);
+ readData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, totalKeys);
+ assert store.getRowCacheSize() == totalKeys;
// force the cache to disk
- store.rowCache.submitWrite().get();
+ store.rowCache.submitWrite(keysToSave).get();
// empty the cache again to make sure values came from disk
store.invalidateRowCache();
@@ -136,12 +149,28 @@ public class RowCacheTest extends Cleanu
// load the cache from disk
store.initCaches();
- assert store.getRowCacheSize() == 100;
+ assert store.getRowCacheSize() == expectedKeys;
- for (int i = 0; i < 100; i++)
+ // If we are loading less than the entire cache back, we can't
+ // be sure which rows we will get if all rows are equally hot.
+ int nulls = 0;
+ int nonNull = 0;
+ for (int i = 0; i < expectedKeys; i++)
{
- // verify the correct data was found
- assert store.getRawCachedRow(Util.dk("key" + i)).getColumn(ByteBufferUtil.bytes("col" + i)).value().equals(ByteBufferUtil.bytes("val" + i));
+ // verify the correct data was found when we expect to get
+ // back the entire cache. Otherwise only make assertions
+ // about how many items are read back.
+ ColumnFamily row = store.getRawCachedRow(Util.dk("key" + i));
+ if (expectedKeys == totalKeys)
+ {
+ assert row != null;
+ assert row.getColumn(ByteBufferUtil.bytes("col" + i)).value().equals(ByteBufferUtil.bytes("val" + i));
+ }
+ if (row == null)
+ nulls++;
+ else
+ nonNull++;
}
+ assert nulls + nonNull == expectedKeys;
}
}