You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/26 20:42:41 UTC

svn commit: r1151205 - in /cassandra/trunk: ./ interface/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/avro/ src/java/org/apache/cassandra/cache/ src/java/org/apache/cassandra/cli/ src/java/org/apache/cassandra/config/ src/java/org/apache...

Author: jbellis
Date: Tue Jul 26 18:42:37 2011
New Revision: 1151205

URL: http://svn.apache.org/viewvc?rev=1151205&view=rev
Log:
add row_cache_keys_to_save CF option
patch by Chris Burroughs; reviewed by jbellis for CASSANDRA-1966

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/interface/cassandra.thrift
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
    cassandra/trunk/src/avro/internode.genavro
    cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Jul 26 18:42:37 2011
@@ -18,6 +18,7 @@
  * store hints as serialized mutations instead of pointers to data rows
  * store hints in the coordinator node instead of in the closest 
    replica (CASSANDRA-2914). 
+ * add row_cache_keys_to_save CF option (CASSANDRA-1966)
 
 
 0.8.2

Modified: cassandra/trunk/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.thrift (original)
+++ cassandra/trunk/interface/cassandra.thrift Tue Jul 26 18:42:37 2011
@@ -46,7 +46,7 @@ namespace rb CassandraThrift
 #           for every edit that doesn't result in a change to major/minor.
 #
 # See the Semantic Versioning Specification (SemVer) http://semver.org.
-const string VERSION = "19.10.0"
+const string VERSION = "19.11.0"
 
 
 #
@@ -396,6 +396,7 @@ struct CfDef {
     28: optional binary key_alias,
     29: optional string compaction_strategy,
     30: optional map<string,string> compaction_strategy_options,
+    31: optional i32 row_cache_keys_to_save,
 }
 
 /* describes a keyspace. */

Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (original)
+++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java Tue Jul 26 18:42:37 2011
@@ -9086,6 +9086,8 @@ public class Cassandra {
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
       try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);
@@ -17041,8 +17043,6 @@ public class Cassandra {
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
       try {
-        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
-        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);
@@ -25752,6 +25752,8 @@ public class Cassandra {
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
       try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);

Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java (original)
+++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java Tue Jul 26 18:42:37 2011
@@ -71,6 +71,7 @@ public class CfDef implements org.apache
   private static final org.apache.thrift.protocol.TField KEY_ALIAS_FIELD_DESC = new org.apache.thrift.protocol.TField("key_alias", org.apache.thrift.protocol.TType.STRING, (short)28);
   private static final org.apache.thrift.protocol.TField COMPACTION_STRATEGY_FIELD_DESC = new org.apache.thrift.protocol.TField("compaction_strategy", org.apache.thrift.protocol.TType.STRING, (short)29);
   private static final org.apache.thrift.protocol.TField COMPACTION_STRATEGY_OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("compaction_strategy_options", org.apache.thrift.protocol.TType.MAP, (short)30);
+  private static final org.apache.thrift.protocol.TField ROW_CACHE_KEYS_TO_SAVE_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_keys_to_save", org.apache.thrift.protocol.TType.I32, (short)31);
 
   public String keyspace;
   public String name;
@@ -98,6 +99,7 @@ public class CfDef implements org.apache
   public ByteBuffer key_alias;
   public String compaction_strategy;
   public Map<String,String> compaction_strategy_options;
+  public int row_cache_keys_to_save;
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -126,7 +128,8 @@ public class CfDef implements org.apache
     ROW_CACHE_PROVIDER((short)27, "row_cache_provider"),
     KEY_ALIAS((short)28, "key_alias"),
     COMPACTION_STRATEGY((short)29, "compaction_strategy"),
-    COMPACTION_STRATEGY_OPTIONS((short)30, "compaction_strategy_options");
+    COMPACTION_STRATEGY_OPTIONS((short)30, "compaction_strategy_options"),
+    ROW_CACHE_KEYS_TO_SAVE((short)31, "row_cache_keys_to_save");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -193,6 +196,8 @@ public class CfDef implements org.apache
           return COMPACTION_STRATEGY;
         case 30: // COMPACTION_STRATEGY_OPTIONS
           return COMPACTION_STRATEGY_OPTIONS;
+        case 31: // ROW_CACHE_KEYS_TO_SAVE
+          return ROW_CACHE_KEYS_TO_SAVE;
         default:
           return null;
       }
@@ -246,7 +251,8 @@ public class CfDef implements org.apache
   private static final int __MEMTABLE_OPERATIONS_IN_MILLIONS_ISSET_ID = 10;
   private static final int __REPLICATE_ON_WRITE_ISSET_ID = 11;
   private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 12;
-  private BitSet __isset_bit_vector = new BitSet(13);
+  private static final int __ROW_CACHE_KEYS_TO_SAVE_ISSET_ID = 13;
+  private BitSet __isset_bit_vector = new BitSet(14);
 
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
@@ -306,6 +312,8 @@ public class CfDef implements org.apache
         new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, 
             new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), 
             new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+    tmpMap.put(_Fields.ROW_CACHE_KEYS_TO_SAVE, new org.apache.thrift.meta_data.FieldMetaData("row_cache_keys_to_save", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CfDef.class, metaDataMap);
   }
@@ -409,6 +417,7 @@ public class CfDef implements org.apache
       }
       this.compaction_strategy_options = __this__compaction_strategy_options;
     }
+    this.row_cache_keys_to_save = other.row_cache_keys_to_save;
   }
 
   public CfDef deepCopy() {
@@ -459,6 +468,8 @@ public class CfDef implements org.apache
     this.key_alias = null;
     this.compaction_strategy = null;
     this.compaction_strategy_options = null;
+    setRow_cache_keys_to_saveIsSet(false);
+    this.row_cache_keys_to_save = 0;
   }
 
   public String getKeyspace() {
@@ -1108,6 +1119,29 @@ public class CfDef implements org.apache
     }
   }
 
+  public int getRow_cache_keys_to_save() {
+    return this.row_cache_keys_to_save;
+  }
+
+  public CfDef setRow_cache_keys_to_save(int row_cache_keys_to_save) {
+    this.row_cache_keys_to_save = row_cache_keys_to_save;
+    setRow_cache_keys_to_saveIsSet(true);
+    return this;
+  }
+
+  public void unsetRow_cache_keys_to_save() {
+    __isset_bit_vector.clear(__ROW_CACHE_KEYS_TO_SAVE_ISSET_ID);
+  }
+
+  /** Returns true if field row_cache_keys_to_save is set (has been assigned a value) and false otherwise */
+  public boolean isSetRow_cache_keys_to_save() {
+    return __isset_bit_vector.get(__ROW_CACHE_KEYS_TO_SAVE_ISSET_ID);
+  }
+
+  public void setRow_cache_keys_to_saveIsSet(boolean value) {
+    __isset_bit_vector.set(__ROW_CACHE_KEYS_TO_SAVE_ISSET_ID, value);
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case KEYSPACE:
@@ -1318,6 +1352,14 @@ public class CfDef implements org.apache
       }
       break;
 
+    case ROW_CACHE_KEYS_TO_SAVE:
+      if (value == null) {
+        unsetRow_cache_keys_to_save();
+      } else {
+        setRow_cache_keys_to_save((Integer)value);
+      }
+      break;
+
     }
   }
 
@@ -1401,6 +1443,9 @@ public class CfDef implements org.apache
     case COMPACTION_STRATEGY_OPTIONS:
       return getCompaction_strategy_options();
 
+    case ROW_CACHE_KEYS_TO_SAVE:
+      return new Integer(getRow_cache_keys_to_save());
+
     }
     throw new IllegalStateException();
   }
@@ -1464,6 +1509,8 @@ public class CfDef implements org.apache
       return isSetCompaction_strategy();
     case COMPACTION_STRATEGY_OPTIONS:
       return isSetCompaction_strategy_options();
+    case ROW_CACHE_KEYS_TO_SAVE:
+      return isSetRow_cache_keys_to_save();
     }
     throw new IllegalStateException();
   }
@@ -1715,6 +1762,15 @@ public class CfDef implements org.apache
         return false;
     }
 
+    boolean this_present_row_cache_keys_to_save = true && this.isSetRow_cache_keys_to_save();
+    boolean that_present_row_cache_keys_to_save = true && that.isSetRow_cache_keys_to_save();
+    if (this_present_row_cache_keys_to_save || that_present_row_cache_keys_to_save) {
+      if (!(this_present_row_cache_keys_to_save && that_present_row_cache_keys_to_save))
+        return false;
+      if (this.row_cache_keys_to_save != that.row_cache_keys_to_save)
+        return false;
+    }
+
     return true;
   }
 
@@ -1852,6 +1908,11 @@ public class CfDef implements org.apache
     if (present_compaction_strategy_options)
       builder.append(compaction_strategy_options);
 
+    boolean present_row_cache_keys_to_save = true && (isSetRow_cache_keys_to_save());
+    builder.append(present_row_cache_keys_to_save);
+    if (present_row_cache_keys_to_save)
+      builder.append(row_cache_keys_to_save);
+
     return builder.toHashCode();
   }
 
@@ -2123,6 +2184,16 @@ public class CfDef implements org.apache
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetRow_cache_keys_to_save()).compareTo(typedOther.isSetRow_cache_keys_to_save());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetRow_cache_keys_to_save()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.row_cache_keys_to_save, typedOther.row_cache_keys_to_save);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -2358,6 +2429,14 @@ public class CfDef implements org.apache
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
           }
           break;
+        case 31: // ROW_CACHE_KEYS_TO_SAVE
+          if (field.type == org.apache.thrift.protocol.TType.I32) {
+            this.row_cache_keys_to_save = iprot.readI32();
+            setRow_cache_keys_to_saveIsSet(true);
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
         default:
           org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
       }
@@ -2540,6 +2619,11 @@ public class CfDef implements org.apache
         oprot.writeFieldEnd();
       }
     }
+    if (isSetRow_cache_keys_to_save()) {
+      oprot.writeFieldBegin(ROW_CACHE_KEYS_TO_SAVE_FIELD_DESC);
+      oprot.writeI32(this.row_cache_keys_to_save);
+      oprot.writeFieldEnd();
+    }
     oprot.writeFieldStop();
     oprot.writeStructEnd();
   }
@@ -2752,6 +2836,12 @@ public class CfDef implements org.apache
       }
       first = false;
     }
+    if (isSetRow_cache_keys_to_save()) {
+      if (!first) sb.append(", ");
+      sb.append("row_cache_keys_to_save:");
+      sb.append(this.row_cache_keys_to_save);
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }

Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java (original)
+++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java Tue Jul 26 18:42:37 2011
@@ -44,6 +44,6 @@ import org.slf4j.LoggerFactory;
 
 public class Constants {
 
-  public static final String VERSION = "19.10.0";
+  public static final String VERSION = "19.11.0";
 
 }

Modified: cassandra/trunk/src/avro/internode.genavro
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/avro/internode.genavro?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/avro/internode.genavro (original)
+++ cassandra/trunk/src/avro/internode.genavro Tue Jul 26 18:42:37 2011
@@ -57,6 +57,7 @@ protocol InterNode {
         union { null, int } max_compaction_threshold = null;
         union { int, null } row_cache_save_period_in_seconds = 0;
         union { int, null } key_cache_save_period_in_seconds = 3600;
+        union { int, null } row_cache_keys_to_save = null;
         union { null, int } memtable_throughput_in_mb = null;
         union { null, double} memtable_operations_in_millions = null;
         union { null, double} merge_shards_chance = null;

Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java Tue Jul 26 18:42:37 2011
@@ -77,12 +77,12 @@ public abstract class AutoSavingCache<K,
         return DatabaseDescriptor.getSerializedCachePath(tableName, cfName, cacheType);
     }
 
-    public Writer getWriter()
+    public Writer getWriter(int keysToSave)
     {
-        return new Writer(tableName, cfName);
+        return new Writer(tableName, cfName, keysToSave);
     }
 
-    public void scheduleSaving(int savePeriodInSeconds)
+    public void scheduleSaving(int savePeriodInSeconds, final int keysToSave)
     {
         if (saveTask != null)
         {
@@ -95,7 +95,7 @@ public abstract class AutoSavingCache<K,
             {
                 public void runMayThrow()
                 {
-                    submitWrite();
+                    submitWrite(keysToSave);
                 }
             };
             saveTask = StorageService.tasks.scheduleWithFixedDelay(runnable,
@@ -105,9 +105,9 @@ public abstract class AutoSavingCache<K,
         }
     }
 
-    public Future<?> submitWrite()
+    public Future<?> submitWrite(int keysToSave)
     {
-        return CompactionManager.instance.submitCacheWrite(getWriter());
+        return CompactionManager.instance.submitCacheWrite(getWriter(keysToSave));
     }
 
     public Set<DecoratedKey> readSaved()
@@ -195,9 +195,12 @@ public abstract class AutoSavingCache<K,
         private final long estimatedTotalBytes;
         private long bytesWritten;
 
-        private Writer(String ksname, String cfname)
+        private Writer(String ksname, String cfname, int keysToSave)
         {
-            keys = getKeySet();
+            if (keysToSave >= getKeySet().size())
+                keys = getKeySet();
+            else
+                keys = hotKeySet(keysToSave);
             long bytes = 0;
             for (K key : keys)
                 bytes += translateKey(key).remaining();

Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java Tue Jul 26 18:42:37 2011
@@ -96,6 +96,11 @@ public class ConcurrentLinkedHashCache<K
         return map.keySet();
     }
 
+    public Set<K> hotKeySet(int n)
+    {
+        return map.descendingKeySetWithLimit(n);
+    }
+
     public boolean isPutCopying()
     {
         return false;

Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/ICache.java Tue Jul 26 18:42:37 2011
@@ -46,6 +46,8 @@ public interface ICache<K, V>
 
     public Set<K> keySet();
 
+    public Set<K> hotKeySet(int n);
+
     /**
      * @return true if the cache implementation inherently copies the cached values; otherwise,
      * the caller should copy manually before caching shared values like Thrift ByteBuffers.

Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentingCache.java Tue Jul 26 18:42:37 2011
@@ -150,6 +150,11 @@ public class InstrumentingCache<K, V> im
         return map.keySet();
     }
 
+    public Set<K> hotKeySet(int n)
+    {
+        return map.hotKeySet(n);
+    }
+
     public boolean isPutCopying()
     {
         return map.isPutCopying();

Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCache.java Tue Jul 26 18:42:37 2011
@@ -163,6 +163,11 @@ public class SerializingCache<K, V> impl
         return map.keySet();
     }
 
+    public Set<K> hotKeySet(int n)
+    {
+        return map.descendingKeySetWithLimit(n);
+    }
+
     public boolean isPutCopying()
     {
         return true;

Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Tue Jul 26 18:42:37 2011
@@ -119,6 +119,7 @@ public class CliClient
         COMMENT,
         ROWS_CACHED,
         ROW_CACHE_SAVE_PERIOD,
+        ROW_CACHE_KEYS_TO_SAVE,
         KEYS_CACHED,
         KEY_CACHE_SAVE_PERIOD,
         READ_REPAIR_CHANCE,
@@ -1231,6 +1232,9 @@ public class CliClient
             case KEY_CACHE_SAVE_PERIOD:
                 cfDef.setKey_cache_save_period_in_seconds(Integer.parseInt(mValue));
                 break;
+            case ROW_CACHE_KEYS_TO_SAVE:
+                cfDef.setRow_cache_keys_to_save(Integer.parseInt(mValue));
+                break;
             case DEFAULT_VALIDATION_CLASS:
                 cfDef.setDefault_validation_class(CliUtils.unescapeSQLString(mValue));
                 break;
@@ -1716,7 +1720,9 @@ public class CliClient
                 if (cf_def.default_validation_class != null)
                     sessionState.out.printf("      Default column value validator: %s%n", cf_def.default_validation_class);
                 sessionState.out.printf("      Columns sorted by: %s%s%n", cf_def.comparator_type, cf_def.column_type.equals("Super") ? "/" + cf_def.subcomparator_type : "");
-                sessionState.out.printf("      Row cache size / save period in seconds: %s/%s%n", cf_def.row_cache_size, cf_def.row_cache_save_period_in_seconds);
+                sessionState.out.printf("      Row cache size / save period in seconds / keys to save : %s/%s/%s%n",
+                                        cf_def.row_cache_size, cf_def.row_cache_save_period_in_seconds,
+                                        cf_def.row_cache_keys_to_save == Integer.MAX_VALUE ? "all" : cf_def.row_cache_keys_to_save);
                 sessionState.out.printf("      Key cache size / save period in seconds: %s/%s%n", cf_def.key_cache_size, cf_def.key_cache_save_period_in_seconds);
                 sessionState.out.printf("      Memtable thresholds: %s/%s (millions of ops/MB)%n",
                                 cf_def.memtable_operations_in_millions, cf_def.memtable_throughput_in_mb);

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Tue Jul 26 18:42:37 2011
@@ -70,6 +70,7 @@ public final class CFMetaData
     public final static int DEFAULT_SYSTEM_MEMTABLE_THROUGHPUT_IN_MB = 8;
     public final static int DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS = 0;
     public final static int DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS = 4 * 3600;
+    public final static int DEFAULT_ROW_CACHE_KEYS_TO_SAVE = Integer.MAX_VALUE;
     public final static int DEFAULT_GC_GRACE_SECONDS = 864000;
     public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
     public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32;
@@ -164,6 +165,7 @@ public final class CFMetaData
     private int maxCompactionThreshold;               // default 32
     private int rowCacheSavePeriodInSeconds;          // default 0 (off)
     private int keyCacheSavePeriodInSeconds;          // default 3600 (1 hour)
+    private int rowCacheKeysToSave;                   // default max int (aka feature is off)
     private int memtableThroughputInMb;               // default based on heap size
     private double memtableOperationsInMillions;      // default based on throughput
     private double mergeShardsChance;                 // default 0.1, chance [0.0, 1.0] of merging old shards during replication
@@ -186,6 +188,7 @@ public final class CFMetaData
     public CFMetaData maxCompactionThreshold(int prop) {maxCompactionThreshold = prop; return this;}
     public CFMetaData rowCacheSavePeriod(int prop) {rowCacheSavePeriodInSeconds = prop; return this;}
     public CFMetaData keyCacheSavePeriod(int prop) {keyCacheSavePeriodInSeconds = prop; return this;}
+    public CFMetaData rowCacheKeysToSave(int prop) {rowCacheKeysToSave = prop; return this;}
     public CFMetaData memSize(int prop) {memtableThroughputInMb = prop; return this;}
     public CFMetaData memOps(double prop) {memtableOperationsInMillions = prop; return this;}
     public CFMetaData mergeShardsChance(double prop) {mergeShardsChance = prop; return this;}
@@ -231,6 +234,7 @@ public final class CFMetaData
         // Set a bunch of defaults
         rowCacheSize                 = DEFAULT_ROW_CACHE_SIZE;
         keyCacheSize                 = DEFAULT_KEY_CACHE_SIZE;
+        rowCacheKeysToSave           = DEFAULT_ROW_CACHE_KEYS_TO_SAVE;
         readRepairChance             = DEFAULT_READ_REPAIR_CHANCE;
         replicateOnWrite             = DEFAULT_REPLICATE_ON_WRITE;
         gcGraceSeconds               = DEFAULT_GC_GRACE_SECONDS;
@@ -319,6 +323,7 @@ public final class CFMetaData
                       .maxCompactionThreshold(oldCFMD.maxCompactionThreshold)
                       .rowCacheSavePeriod(oldCFMD.rowCacheSavePeriodInSeconds)
                       .keyCacheSavePeriod(oldCFMD.keyCacheSavePeriodInSeconds)
+                      .rowCacheKeysToSave(oldCFMD.rowCacheKeysToSave)
                       .memSize(oldCFMD.memtableThroughputInMb)
                       .memOps(oldCFMD.memtableOperationsInMillions)
                       .columnMetadata(oldCFMD.column_metadata)
@@ -368,6 +373,7 @@ public final class CFMetaData
         cf.max_compaction_threshold = maxCompactionThreshold;
         cf.row_cache_save_period_in_seconds = rowCacheSavePeriodInSeconds;
         cf.key_cache_save_period_in_seconds = keyCacheSavePeriodInSeconds;
+        cf.row_cache_keys_to_save = rowCacheKeysToSave;
         cf.memtable_throughput_in_mb = memtableThroughputInMb;
         cf.memtable_operations_in_millions = memtableOperationsInMillions;
         cf.merge_shards_chance = mergeShardsChance;
@@ -430,6 +436,7 @@ public final class CFMetaData
         if (cf.max_compaction_threshold != null) { newCFMD.maxCompactionThreshold(cf.max_compaction_threshold); }
         if (cf.row_cache_save_period_in_seconds != null) { newCFMD.rowCacheSavePeriod(cf.row_cache_save_period_in_seconds); }
         if (cf.key_cache_save_period_in_seconds != null) { newCFMD.keyCacheSavePeriod(cf.key_cache_save_period_in_seconds); }
+        if (cf.row_cache_keys_to_save != null) { newCFMD.rowCacheKeysToSave(cf.row_cache_keys_to_save); }
         if (cf.memtable_throughput_in_mb != null) { newCFMD.memSize(cf.memtable_throughput_in_mb); }
         if (cf.memtable_operations_in_millions != null) { newCFMD.memOps(cf.memtable_operations_in_millions); }
         if (cf.merge_shards_chance != null) { newCFMD.mergeShardsChance(cf.merge_shards_chance); }
@@ -538,6 +545,11 @@ public final class CFMetaData
         return keyCacheSavePeriodInSeconds;
     }
 
+    public int getRowCacheKeysToSave()
+    {
+        return rowCacheKeysToSave;
+    }
+
     public int getMemtableThroughputInMb()
     {
         return memtableThroughputInMb;
@@ -600,6 +612,7 @@ public final class CFMetaData
             .append(column_metadata, rhs.column_metadata)
             .append(rowCacheSavePeriodInSeconds, rhs.rowCacheSavePeriodInSeconds)
             .append(keyCacheSavePeriodInSeconds, rhs.keyCacheSavePeriodInSeconds)
+            .append(rowCacheKeysToSave, rhs.rowCacheKeysToSave)
             .append(memtableThroughputInMb, rhs.memtableThroughputInMb)
             .append(memtableOperationsInMillions, rhs.memtableOperationsInMillions)
             .append(mergeShardsChance, rhs.mergeShardsChance)
@@ -631,6 +644,7 @@ public final class CFMetaData
             .append(column_metadata)
             .append(rowCacheSavePeriodInSeconds)
             .append(keyCacheSavePeriodInSeconds)
+            .append(rowCacheKeysToSave)
             .append(memtableThroughputInMb)
             .append(memtableOperationsInMillions)
             .append(mergeShardsChance)
@@ -669,6 +683,8 @@ public final class CFMetaData
             cf_def.setRow_cache_save_period_in_seconds(CFMetaData.DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS);
         if (!cf_def.isSetKey_cache_save_period_in_seconds())
             cf_def.setKey_cache_save_period_in_seconds(CFMetaData.DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS);
+        if (!cf_def.isSetRow_cache_keys_to_save())
+            cf_def.setRow_cache_keys_to_save(CFMetaData.DEFAULT_ROW_CACHE_KEYS_TO_SAVE);
         if (!cf_def.isSetMemtable_throughput_in_mb())
             cf_def.setMemtable_throughput_in_mb(CFMetaData.DEFAULT_MEMTABLE_THROUGHPUT_IN_MB);
         if (!cf_def.isSetMemtable_operations_in_millions())
@@ -704,6 +720,7 @@ public final class CFMetaData
         if (cf_def.isSetMax_compaction_threshold()) { newCFMD.maxCompactionThreshold(cf_def.max_compaction_threshold); }
         if (cf_def.isSetRow_cache_save_period_in_seconds()) { newCFMD.rowCacheSavePeriod(cf_def.row_cache_save_period_in_seconds); }
         if (cf_def.isSetKey_cache_save_period_in_seconds()) { newCFMD.keyCacheSavePeriod(cf_def.key_cache_save_period_in_seconds); }
+        if (cf_def.isSetRow_cache_keys_to_save()) { newCFMD.rowCacheKeysToSave(cf_def.row_cache_keys_to_save); }
         if (cf_def.isSetMemtable_throughput_in_mb()) { newCFMD.memSize(cf_def.memtable_throughput_in_mb); }
         if (cf_def.isSetMemtable_operations_in_millions()) { newCFMD.memOps(cf_def.memtable_operations_in_millions); }
         if (cf_def.isSetMerge_shards_chance()) { newCFMD.mergeShardsChance(cf_def.merge_shards_chance); }
@@ -776,6 +793,7 @@ public final class CFMetaData
         maxCompactionThreshold = cf_def.max_compaction_threshold;
         rowCacheSavePeriodInSeconds = cf_def.row_cache_save_period_in_seconds;
         keyCacheSavePeriodInSeconds = cf_def.key_cache_save_period_in_seconds;
+        rowCacheKeysToSave = cf_def.row_cache_keys_to_save;
         memtableThroughputInMb = cf_def.memtable_throughput_in_mb;
         memtableOperationsInMillions = cf_def.memtable_operations_in_millions;
         mergeShardsChance = cf_def.merge_shards_chance;
@@ -895,6 +913,7 @@ public final class CFMetaData
         def.setMax_compaction_threshold(cfm.maxCompactionThreshold);
         def.setRow_cache_save_period_in_seconds(cfm.rowCacheSavePeriodInSeconds);
         def.setKey_cache_save_period_in_seconds(cfm.keyCacheSavePeriodInSeconds);
+        def.setRow_cache_keys_to_save(cfm.rowCacheKeysToSave);
         def.setMemtable_throughput_in_mb(cfm.memtableThroughputInMb);
         def.setMemtable_operations_in_millions(cfm.memtableOperationsInMillions);
         def.setMerge_shards_chance(cfm.mergeShardsChance);
@@ -941,6 +960,7 @@ public final class CFMetaData
         def.max_compaction_threshold = cfm.maxCompactionThreshold;
         def.row_cache_save_period_in_seconds = cfm.rowCacheSavePeriodInSeconds;
         def.key_cache_save_period_in_seconds = cfm.keyCacheSavePeriodInSeconds;
+        def.row_cache_keys_to_save = cfm.rowCacheKeysToSave;
         def.memtable_throughput_in_mb = cfm.memtableThroughputInMb;
         def.memtable_operations_in_millions = cfm.memtableOperationsInMillions;
         def.merge_shards_chance = cfm.mergeShardsChance;
@@ -986,6 +1006,7 @@ public final class CFMetaData
         newDef.read_repair_chance = def.getRead_repair_chance();
         newDef.replicate_on_write = def.isReplicate_on_write();
         newDef.row_cache_save_period_in_seconds = def.getRow_cache_save_period_in_seconds();
+        newDef.row_cache_keys_to_save = def.getRow_cache_keys_to_save();
         newDef.row_cache_size = def.getRow_cache_size();
         newDef.subcomparator_type = def.getSubcomparator_type();
         newDef.merge_shards_chance = def.getMerge_shards_chance();
@@ -1120,6 +1141,7 @@ public final class CFMetaData
             .append("maxCompactionThreshold", maxCompactionThreshold)
             .append("rowCacheSavePeriodInSeconds", rowCacheSavePeriodInSeconds)
             .append("keyCacheSavePeriodInSeconds", keyCacheSavePeriodInSeconds)
+            .append("rowCacheKeysToSave", rowCacheKeysToSave)
             .append("memtableThroughputInMb", memtableThroughputInMb)
             .append("memtableOperationsInMillions", memtableOperationsInMillions)
             .append("mergeShardsChance", mergeShardsChance)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Jul 26 18:42:37 2011
@@ -142,6 +142,7 @@ public class ColumnFamilyStore implement
     private volatile DefaultDouble memops;
     private volatile DefaultInteger rowCacheSaveInSeconds;
     private volatile DefaultInteger keyCacheSaveInSeconds;
+    private volatile DefaultInteger rowCacheKeysToSave; 
 
     /** Lock to allow migrations to block all flushing, so we can be sure not to write orphaned data files */
     public final Lock flushLock = new ReentrantLock();
@@ -195,11 +196,13 @@ public class ColumnFamilyStore implement
             rowCacheSaveInSeconds = new DefaultInteger(metadata.getRowCacheSavePeriodInSeconds());
         if (!keyCacheSaveInSeconds.isModified())
             keyCacheSaveInSeconds = new DefaultInteger(metadata.getKeyCacheSavePeriodInSeconds());
+        if (!rowCacheKeysToSave.isModified())
+            rowCacheKeysToSave = new DefaultInteger(metadata.getRowCacheKeysToSave());
 
         compactionStrategy = metadata.createCompactionStrategyInstance(this);
 
         updateCacheSizes();
-        scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value());
+        scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value(), rowCacheKeysToSave.value());
         
         // figure out what needs to be added and dropped.
         // future: if/when we have modifiable settings for secondary indexes, they'll need to be handled here.
@@ -241,6 +244,7 @@ public class ColumnFamilyStore implement
         this.memops = new DefaultDouble(metadata.getMemtableOperationsInMillions());
         this.rowCacheSaveInSeconds = new DefaultInteger(metadata.getRowCacheSavePeriodInSeconds());
         this.keyCacheSaveInSeconds = new DefaultInteger(metadata.getKeyCacheSavePeriodInSeconds());
+        this.rowCacheKeysToSave = new DefaultInteger(metadata.getRowCacheKeysToSave());
         this.partitioner = partitioner;
         fileIndexGenerator.set(generation);
 
@@ -542,13 +546,13 @@ public class ColumnFamilyStore implement
                                       table.name,
                                       columnFamily));
 
-        scheduleCacheSaving(metadata.getRowCacheSavePeriodInSeconds(), metadata.getKeyCacheSavePeriodInSeconds());
+        scheduleCacheSaving(metadata.getRowCacheSavePeriodInSeconds(), metadata.getKeyCacheSavePeriodInSeconds(), metadata.getRowCacheKeysToSave());
     }
 
-    public void scheduleCacheSaving(int rowCacheSavePeriodInSeconds, int keyCacheSavePeriodInSeconds)
+    public void scheduleCacheSaving(int rowCacheSavePeriodInSeconds, int keyCacheSavePeriodInSeconds, int rowCacheKeysToSave)
     {
-        keyCache.scheduleSaving(keyCacheSavePeriodInSeconds);
-        rowCache.scheduleSaving(rowCacheSavePeriodInSeconds);
+        keyCache.scheduleSaving(keyCacheSavePeriodInSeconds, Integer.MAX_VALUE);
+        rowCache.scheduleSaving(rowCacheSavePeriodInSeconds, rowCacheKeysToSave);
     }
 
     public AutoSavingCache<Pair<Descriptor,DecoratedKey>, Long> getKeyCache()
@@ -1985,6 +1989,7 @@ public class ColumnFamilyStore implement
        - get/set memtime
        - get/set rowCacheSavePeriodInSeconds
        - get/set keyCacheSavePeriodInSeconds
+       - get/set rowCacheKeysToSave
      */
 
     public AbstractCompactionStrategy getCompactionStrategy()
@@ -2056,7 +2061,7 @@ public class ColumnFamilyStore implement
             throw new RuntimeException("RowCacheSavePeriodInSeconds must be non-negative.");
         }
         this.rowCacheSaveInSeconds.set(rcspis);
-        scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value());
+        scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value(), rowCacheKeysToSave.value());
     }
 
     public int getKeyCacheSavePeriodInSeconds()
@@ -2070,7 +2075,17 @@ public class ColumnFamilyStore implement
             throw new RuntimeException("KeyCacheSavePeriodInSeconds must be non-negative.");
         }
         this.keyCacheSaveInSeconds.set(kcspis);
-        scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value());
+        scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value(), rowCacheKeysToSave.value());
+    }
+
+    public int getRowCacheKeysToSave()
+    {
+        return rowCacheKeysToSave.value();
+    }
+
+    public void setRowCacheKeysToSave(int keysToSave)
+    {
+        this.rowCacheKeysToSave.set(keysToSave);
     }
     // End JMX get/set.
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Tue Jul 26 18:42:37 2011
@@ -231,4 +231,7 @@ public interface ColumnFamilyStoreMBean
 
     public int getKeyCacheSavePeriodInSeconds();
     public void setKeyCacheSavePeriodInSeconds(int kcspis);
+
+    public int getRowCacheKeysToSave();
+    public void setRowCacheKeysToSave(int keysToSave);
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java Tue Jul 26 18:42:37 2011
@@ -159,6 +159,7 @@ public abstract class Migration
         assert !StorageService.instance.isClientMode();
         assert column != null;
         MigrationManager.announce(column);
+        passiveAnnounce(); // keeps gossip in sync w/ what we just told everyone
     }
 
     public final void passiveAnnounce()

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Jul 26 18:42:37 2011
@@ -2216,8 +2216,8 @@ public class StorageService implements I
         logger_.debug("submitting cache saves");
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
-            futures.add(cfs.keyCache.submitWrite());
-            futures.add(cfs.rowCache.submitWrite());
+            futures.add(cfs.keyCache.submitWrite(-1));
+            futures.add(cfs.rowCache.submitWrite(cfs.getRowCacheKeysToSave()));
         }
         FBUtilities.waitOnFutures(futures);
         logger_.debug("cache saves completed");

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java Tue Jul 26 18:42:37 2011
@@ -84,7 +84,7 @@ public class KeyCacheTest extends Cleanu
         }
 
         // force the cache to disk
-        store.keyCache.submitWrite().get();
+        store.keyCache.submitWrite(Integer.MAX_VALUE).get();
 
         // empty the cache again to make sure values came from disk
         store.invalidateKeyCache();

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java?rev=1151205&r1=1151204&r2=1151205&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java Tue Jul 26 18:42:37 2011
@@ -114,6 +114,19 @@ public class RowCacheTest extends Cleanu
     @Test
     public void testRowCacheLoad() throws Exception
     {
+        rowCacheLoad(100, 100, Integer.MAX_VALUE);
+    }
+
+
+    @Test
+    public void testRowCachePartialLoad() throws Exception
+    {
+        rowCacheLoad(100, 50, 50);
+    }
+
+
+    public void rowCacheLoad(int totalKeys, int expectedKeys, int keysToSave) throws Exception
+    {
         CompactionManager.instance.disableAutoCompaction();
 
         ColumnFamilyStore store = Table.open(KEYSPACE).getColumnFamilyStore(COLUMN_FAMILY_WITH_CACHE);
@@ -123,12 +136,12 @@ public class RowCacheTest extends Cleanu
         assert store.getRowCacheSize() == 0;
 
         // insert data and fill the cache
-        insertData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, 100);
-        readData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, 100);
-        assert store.getRowCacheSize() == 100;
+        insertData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, totalKeys);
+        readData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, totalKeys);
+        assert store.getRowCacheSize() == totalKeys;
 
         // force the cache to disk
-        store.rowCache.submitWrite().get();
+        store.rowCache.submitWrite(keysToSave).get();
 
         // empty the cache again to make sure values came from disk
         store.invalidateRowCache();
@@ -136,12 +149,28 @@ public class RowCacheTest extends Cleanu
 
         // load the cache from disk
         store.initCaches();
-        assert store.getRowCacheSize() == 100;
+        assert store.getRowCacheSize() == expectedKeys;
 
-        for (int i = 0; i < 100; i++)
+        // If we are loading less than the entire cache back, we can't
+        // be sure which rows we will get if all rows are equally hot.
+        int nulls = 0;
+        int nonNull = 0;
+        for (int i = 0; i < expectedKeys; i++)
         {
-            // verify the correct data was found
-            assert store.getRawCachedRow(Util.dk("key" + i)).getColumn(ByteBufferUtil.bytes("col" + i)).value().equals(ByteBufferUtil.bytes("val" + i));
+            // verify the correct data was found when we expect to get
+            // back the entire cache.  Otherwise only make assertions
+            // about how many items are read back.
+            ColumnFamily row = store.getRawCachedRow(Util.dk("key" + i));
+            if (expectedKeys == totalKeys)
+            {
+                assert row != null;
+                assert row.getColumn(ByteBufferUtil.bytes("col" + i)).value().equals(ByteBufferUtil.bytes("val" + i));
+            }
+            if (row == null)
+                nulls++;
+            else
+                nonNull++;
         }
+        assert nulls + nonNull == expectedKeys;
     }
 }