You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2015/09/16 22:05:01 UTC

[8/9] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CacheService.java
index a48466a,a13a52d..a3e7d12
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@@ -21,9 -22,9 +21,8 @@@ import java.io.IOException
  import java.lang.management.ManagementFactory;
  import java.nio.ByteBuffer;
  import java.util.ArrayList;
 -import java.util.Collection;
  import java.util.Iterator;
  import java.util.List;
- import java.util.UUID;
  import java.util.concurrent.Callable;
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.Future;
@@@ -363,48 -360,33 +364,53 @@@ public class CacheService implements Ca
              ByteBufferUtil.writeWithLength(key.cellName, out);
          }
  
 -        public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
 +        public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputPlus in, final ColumnFamilyStore cfs) throws IOException
          {
+             //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
+             //parameter so they aren't deserialized here, even though they are serialized by this serializer
              final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in);
 -            ByteBuffer cellNameBuffer = ByteBufferUtil.readWithLength(in);
 +            final ByteBuffer cellName = ByteBufferUtil.readWithLength(in);
+             if (cfs == null || !cfs.metadata.isCounter() || !cfs.isCounterCacheEnabled())
+                 return null;
+             assert(cfs.metadata.isCounter());
 -            final CellName cellName = cfs.metadata.comparator.cellFromByteBuffer(cellNameBuffer);
              return StageManager.getStage(Stage.READ).submit(new Callable<Pair<CounterCacheKey, ClockAndCount>>()
              {
                  public Pair<CounterCacheKey, ClockAndCount> call() throws Exception
                  {
 -                    DecoratedKey key = cfs.partitioner.decorateKey(partitionKey);
 -                    QueryFilter filter = QueryFilter.getNamesFilter(key,
 -                                                                    cfs.metadata.cfName,
 -                                                                    FBUtilities.singleton(cellName, cfs.metadata.comparator),
 -                                                                    Long.MIN_VALUE);
 -                    ColumnFamily cf = cfs.getTopLevelColumns(filter, Integer.MIN_VALUE);
 -                    if (cf == null)
 -                        return null;
 -                    Cell cell = cf.getColumn(cellName);
 -                    if (cell == null || !cell.isLive(Long.MIN_VALUE))
 -                        return null;
 -                    ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
 -                    return Pair.create(CounterCacheKey.create(cfs.metadata.ksAndCFName, partitionKey, cellName), clockAndCount);
 +                    DecoratedKey key = cfs.decorateKey(partitionKey);
 +                    LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(cfs.metadata, cellName);
 +                    ColumnDefinition column = name.column;
 +                    CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
 +
 +                    int nowInSec = FBUtilities.nowInSeconds();
 +                    ColumnFilter.Builder builder = ColumnFilter.selectionBuilder();
 +                    if (path == null)
 +                        builder.add(column);
 +                    else
 +                        builder.select(column, path);
 +
 +                    ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(FBUtilities.<Clustering>singleton(name.clustering, cfs.metadata.comparator), false);
 +                    SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(cfs.metadata, nowInSec, key, builder.build(), filter);
 +                    try (OpOrder.Group op = cfs.readOrdering.start(); RowIterator iter = UnfilteredRowIterators.filter(cmd.queryMemtableAndDisk(cfs, op), nowInSec))
 +                    {
 +                        Cell cell;
 +                        if (column.isStatic())
 +                        {
 +                            cell = iter.staticRow().getCell(column);
 +                        }
 +                        else
 +                        {
 +                            if (!iter.hasNext())
 +                                return null;
 +                            cell = iter.next().getCell(column);
 +                        }
 +
 +                        if (cell == null)
 +                            return null;
 +
 +                        ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
-                         return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, name.clustering, column, path), clockAndCount);
++                        return Pair.create(CounterCacheKey.create(cfs.metadata.ksAndCFName, partitionKey, name.clustering, column, path), clockAndCount);
 +                    }
                  }
              });
          }
@@@ -412,27 -394,30 +418,34 @@@
  
      public static class RowCacheSerializer implements CacheSerializer<RowCacheKey, IRowCacheEntry>
      {
-         public void serialize(RowCacheKey key, DataOutputPlus out) throws IOException
+         public void serialize(RowCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException
          {
 -            assert(!cfs.isIndex());
++            assert(!cfs.isIndex());//Shouldn't have row cache entries for indexes
+             out.write(cfs.metadata.ksAndCFBytes);
              ByteBufferUtil.writeWithLength(key.key, out);
          }
  
 -        public Future<Pair<RowCacheKey, IRowCacheEntry>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
 +        public Future<Pair<RowCacheKey, IRowCacheEntry>> deserialize(DataInputPlus in, final ColumnFamilyStore cfs) throws IOException
          {
+             //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
+             //parameter so they aren't deserialized here, even though they are serialized by this serializer
              final ByteBuffer buffer = ByteBufferUtil.readWithLength(in);
 +            final int rowsToCache = cfs.metadata.params.caching.rowsPerPartitionToCache();
+             if (cfs == null  || !cfs.isRowCacheEnabled())
+                 return null;
 -            assert(!cfs.isIndex());
++            assert(!cfs.isIndex());//Shouldn't have row cache entries for indexes
  
              return StageManager.getStage(Stage.READ).submit(new Callable<Pair<RowCacheKey, IRowCacheEntry>>()
              {
                  public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception
                  {
 -                    DecoratedKey key = cfs.partitioner.decorateKey(buffer);
 -                    QueryFilter cacheFilter = new QueryFilter(key, cfs.getColumnFamilyName(), cfs.readFilterForCache(), Integer.MIN_VALUE);
 -                    ColumnFamily data = cfs.getTopLevelColumns(cacheFilter, Integer.MIN_VALUE);
 -                    return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry) data);
 +                    DecoratedKey key = cfs.decorateKey(buffer);
 +                    int nowInSec = FBUtilities.nowInSeconds();
 +                    try (OpOrder.Group op = cfs.readOrdering.start(); UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key).queryMemtableAndDisk(cfs, op))
 +                    {
 +                        CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec);
-                         return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry)toCache);
++                        return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry)toCache);
 +                    }
                  }
              });
          }
@@@ -453,11 -435,13 +463,13 @@@
              ByteBufferUtil.writeWithLength(key.key, out);
              out.writeInt(key.desc.generation);
              out.writeBoolean(true);
-             key.desc.getFormat().getIndexSerializer(cfm, key.desc.version, SerializationHeader.forKeyCache(cfm)).serialize(entry, out);
 -            key.desc.getFormat().getIndexSerializer(cfs.metadata).serialize(entry, out);
++            key.desc.getFormat().getIndexSerializer(cfs.metadata, key.desc.version, SerializationHeader.forKeyCache(cfs.metadata)).serialize(entry, out);
          }
  
 -        public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException
 +        public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputPlus input, ColumnFamilyStore cfs) throws IOException
          {
+             //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
+             //parameter so they aren't deserialized here, even though they are serialized by this serializer
              int keyLength = input.readInt();
              if (keyLength > FBUtilities.MAX_UNSIGNED_SHORT)
              {
@@@ -466,25 -450,18 +478,25 @@@
              }
              ByteBuffer key = ByteBufferUtil.read(input, keyLength);
              int generation = input.readInt();
-             SSTableReader reader = findDesc(generation, cfs.getSSTables(SSTableSet.CANONICAL));
              input.readBoolean(); // backwards compatibility for "promoted indexes" boolean
-             if (reader == null)
+             SSTableReader reader = null;
 -            if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = findDesc(generation, cfs.getSSTables())) == null)
++            if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = findDesc(generation, cfs.getSSTables(SSTableSet.CANONICAL))) == null)
              {
 -                RowIndexEntry.Serializer.skip(input);
 +                // The sstable doesn't exist anymore, so we can't be sure of the exact version and assume its the current version. The only case where we'll be
 +                // wrong is during upgrade, in which case we fail at deserialization. This is not a huge deal however since 1) this is unlikely enough that
 +                // this won't affect many users (if any) and only once, 2) this doesn't prevent the node from starting and 3) CASSANDRA-10219 shows that this
 +                // part of the code has been broken for a while without anyone noticing (it is, btw, still broken until CASSANDRA-10219 is fixed).
-                 RowIndexEntry.Serializer.skipPromotedIndex(input, BigFormat.instance.getLatestVersion());
++                RowIndexEntry.Serializer.skip(input, BigFormat.instance.getLatestVersion());
                  return null;
              }
 -            RowIndexEntry entry = reader.descriptor.getFormat().getIndexSerializer(reader.metadata).deserialize(input, reader.descriptor.version);
 +            RowIndexEntry.IndexSerializer<?> indexSerializer = reader.descriptor.getFormat().getIndexSerializer(reader.metadata,
 +                                                                                                                reader.descriptor.version,
 +                                                                                                                SerializationHeader.forKeyCache(cfs.metadata));
 +            RowIndexEntry entry = indexSerializer.deserialize(input);
-             return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.cfId, reader.descriptor, key), entry));
+             return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry));
          }
  
 -        private SSTableReader findDesc(int generation, Collection<SSTableReader> collection)
 +        private SSTableReader findDesc(int generation, Iterable<SSTableReader> collection)
          {
              for (SSTableReader sstable : collection)
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index 1408a70,075c8f7..f9ee9e8
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -26,8 -26,11 +26,10 @@@ import java.net.UnknownHostException
  import java.rmi.registry.LocateRegistry;
  import java.rmi.server.RMIServerSocketFactory;
  import java.util.Collections;
+ import java.util.List;
  import java.util.Map;
 -import java.util.UUID;
  import java.util.concurrent.TimeUnit;
+ 
  import javax.management.MBeanServer;
  import javax.management.ObjectName;
  import javax.management.StandardMBean;
@@@ -40,8 -42,10 +42,10 @@@ import com.codahale.metrics.Meter
  import com.codahale.metrics.MetricRegistryListener;
  import com.codahale.metrics.SharedMetricRegistries;
  import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.util.concurrent.Futures;
+ import com.google.common.util.concurrent.ListenableFuture;
  import com.google.common.util.concurrent.Uninterruptibles;
 -import org.apache.cassandra.metrics.DefaultNameFactory;
 +
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index f0ad46f,fa370dc..5e4ef39
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2652,15 -2624,15 +2659,15 @@@ public class StorageService extends Not
  
      /**
       * Takes the snapshot of a multiple column family from different keyspaces. A snapshot name must be specified.
-      * 
-      * 
+      *
+      *
       * @param tag
       *            the tag given to the snapshot; may not be null or empty
 -     * @param columnFamilyList
 -     *            list of columnfamily from different keyspace in the form of ks1.cf1 ks2.cf2
 +     * @param tableList
 +     *            list of tables from different keyspace in the form of ks1.cf1 ks2.cf2
       */
      @Override
 -    public void takeMultipleColumnFamilySnapshot(String tag, String... columnFamilyList)
 +    public void takeMultipleTableSnapshot(String tag, String... tableList)
              throws IOException
      {
          Map<Keyspace, List<String>> keyspaceColumnfamily = new HashMap<Keyspace, List<String>>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/FBUtilities.java
index c4b4193,a16fa13..83b07e0
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@@ -775,29 -793,19 +775,45 @@@ public class FBUtilitie
          digest.update((byte)  ((val >>> 0) & 0xFF));
      }
  
 +    public static void updateWithBoolean(MessageDigest digest, boolean val)
 +    {
 +        updateWithByte(digest, val ? 0 : 1);
 +    }
 +
 +    public static void closeAll(List<? extends AutoCloseable> l) throws Exception
 +    {
 +        Exception toThrow = null;
 +        for (AutoCloseable c : l)
 +        {
 +            try
 +            {
 +                c.close();
 +            }
 +            catch (Exception e)
 +            {
 +                if (toThrow == null)
 +                    toThrow = e;
 +                else
 +                    toThrow.addSuppressed(e);
 +            }
 +        }
 +        if (toThrow != null)
 +            throw toThrow;
 +    }
++
+     public static byte[] toWriteUTFBytes(String s)
+     {
+         try
+         {
+             ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream dos = new DataOutputStream(baos);
+             dos.writeUTF(s);
+             dos.flush();
+             return baos.toByteArray();
+         }
+         catch (IOException e)
+         {
+             throw new RuntimeException(e);
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
index c4157ea,475e436..0c7e8a5
--- a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
+++ b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
@@@ -78,9 -76,8 +78,8 @@@ public class AutoSavingCacheTes
          Assert.assertEquals(0, keyCache.size());
  
          // then load saved
-         keyCache.loadSaved(cfs);
-         Assert.assertEquals(2, keyCache.size());
+         keyCache.loadSavedAsync().get();
 -        for (SSTableReader sstable : cfs.getSSTables())
 +        for (SSTableReader sstable : cfs.getLiveSSTables())
-             Assert.assertNotNull(keyCache.get(new KeyCacheKey(cfs.metadata.cfId, sstable.descriptor, ByteBufferUtil.bytes("key1"))));
+             Assert.assertNotNull(keyCache.get(new KeyCacheKey(cfs.metadata.ksAndCFName, sstable.descriptor, ByteBufferUtil.bytes("key1"))));
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index 21a41c4,bfcfa59..cd52d35
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@@ -18,27 -19,27 +18,33 @@@
   */
  package org.apache.cassandra.cache;
  
 -
  import java.nio.ByteBuffer;
 -import java.util.ArrayList;
 -import java.util.List;
 +import java.security.MessageDigest;
 +import java.security.NoSuchAlgorithmException;
- import java.util.*;
  
  import org.junit.BeforeClass;
  import org.junit.Test;
 +import static org.junit.Assert.*;
 +
++
++import java.util.ArrayList;
++import java.util.List;
++
+ 
+ import org.apache.cassandra.SchemaLoader;
 -import org.apache.cassandra.config.KSMetaData;
 -import org.apache.cassandra.db.ArrayBackedSortedColumns;
 -import org.apache.cassandra.db.ColumnFamily;
+ import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.locator.SimpleStrategy;
+ import org.apache.cassandra.utils.Pair;
+ 
  import com.googlecode.concurrentlinkedhashmap.Weighers;
  
 -import static org.apache.cassandra.Util.column;
 -import static org.junit.Assert.*;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.marshal.AsciiType;
 +import org.apache.cassandra.db.partitions.*;
- import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.net.MessagingService;
- import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.utils.FBUtilities;
  
  public class CacheProviderTest
  {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
index 9fef63f,1a60d6d..861e840
--- a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
+++ b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
@@@ -26,15 -25,19 +26,20 @@@ import java.util.List
  import org.junit.Assert;
  import org.junit.Test;
  
 -import com.google.common.collect.ImmutableSet;
 -
+ import org.apache.cassandra.cache.KeyCacheKey;
+ import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.metrics.CacheMetrics;
  import org.apache.cassandra.metrics.CassandraMetricsRegistry;
  import org.apache.cassandra.service.CacheService;
  import org.apache.cassandra.service.StorageService;
 -import org.apache.cassandra.utils.Pair;
  
  import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.assertNull;
++import org.apache.cassandra.utils.Pair;
++
  
  public class KeyCacheCqlTest extends CQLTester
  {
@@@ -167,6 -71,49 +172,47 @@@
          insertData(table, "some_index", true);
          clearCache();
  
+         CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+             assertEquals(500, result.size());
+         }
+ 
+         long hits = metrics.hits.getCount();
+         long requests = metrics.requests.getCount();
 -        assertEquals(4900, hits);
 -        assertEquals(5250, requests);
 -
 -        //
++        assertEquals(0, hits);
++        assertEquals(210, requests);
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+             // 100 part-keys * 50 clust-keys
+             // indexed on part-key % 10 = 10 index partitions
+             // (50 clust-keys  *  100-part-keys  /  10 possible index-values) = 500
+             assertEquals(500, result.size());
+         }
+ 
+         metrics = CacheService.instance.keyCache.getMetrics();
+         hits = metrics.hits.getCount();
+         requests = metrics.requests.getCount();
 -        assertEquals(10000, hits);
 -        assertEquals(10500, requests);
++        assertEquals(200, hits);
++        assertEquals(420, requests);
+ 
+         CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
+ 
+         int beforeSize = CacheService.instance.keyCache.size();
+ 
+         CacheService.instance.keyCache.clear();
+ 
+         Assert.assertEquals(0, CacheService.instance.keyCache.size());
+ 
+         // then load saved
+         CacheService.instance.keyCache.loadSaved();
+ 
+         assertEquals(beforeSize, CacheService.instance.keyCache.size());
+ 
          for (int i = 0; i < 10; i++)
          {
              UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
@@@ -173,11 -123,44 +222,44 @@@
              assertEquals(500, result.size());
          }
  
+         //Test Schema.getColumnFamilyStoreIncludingIndexes, several null check paths
+         //are defensive and unreachable
+         assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create("foo", "bar")));
+         assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(KEYSPACE, "bar")));
+ 
+         dropTable("DROP TABLE %s");
+ 
+         //Test loading for a dropped 2i/table
+         CacheService.instance.keyCache.clear();
+ 
+         // then load saved
+         CacheService.instance.keyCache.loadSaved();
+ 
+         assertEquals(0, CacheService.instance.keyCache.size());
+     }
+ 
+     @Test
+     public void test2iKeyCachePathsSaveKeysForDroppedTable() throws Throwable
+     {
+         String table = createTable("CREATE TABLE %s ("
+                                    + commonColumnsDef
+                                    + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))");
+         createIndex("CREATE INDEX some_index ON %s (col_int)");
+         insertData(table, "some_index", true);
+         clearCache();
+ 
          CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+             assertEquals(500, result.size());
+         }
+ 
          long hits = metrics.hits.getCount();
          long requests = metrics.requests.getCount();
 -        assertEquals(4900, hits);
 -        assertEquals(5250, requests);
 +        assertEquals(0, hits);
 +        assertEquals(210, requests);
  
          //
  
@@@ -193,110 -176,29 +275,129 @@@
          metrics = CacheService.instance.keyCache.getMetrics();
          hits = metrics.hits.getCount();
          requests = metrics.requests.getCount();
 -        assertEquals(10000, hits);
 -        assertEquals(10500, requests);
 +        assertEquals(200, hits);
 +        assertEquals(420, requests);
+ 
+         dropTable("DROP TABLE %s");
+ 
+         CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
+ 
+         CacheService.instance.keyCache.clear();
+ 
+         Assert.assertEquals(0, CacheService.instance.keyCache.size());
+ 
+         // then load saved
+         CacheService.instance.keyCache.loadSaved();
+ 
+         Iterator<KeyCacheKey> iter = CacheService.instance.keyCache.keyIterator();
+         while(iter.hasNext())
+         {
+             KeyCacheKey key = iter.next();
+             Assert.assertFalse(key.ksAndCFName.left.equals("KEYSPACE"));
+             Assert.assertFalse(key.ksAndCFName.right.startsWith(table));
+         }
      }
  
 +    @Test
 +    public void testKeyCacheNonClustered() throws Throwable
 +    {
 +        String table = createTable("CREATE TABLE %s ("
 +                                   + commonColumnsDef
 +                                   + "PRIMARY KEY ((part_key_a, part_key_b)))");
 +        insertData(table, null, false);
 +        clearCache();
 +
 +        for (int i = 0; i < 10; i++)
 +        {
 +            assertRows(execute("SELECT col_text FROM %s WHERE part_key_a = ? AND part_key_b = ?", i, Integer.toOctalString(i)),
 +                       new Object[]{ String.valueOf(i) + '-' + String.valueOf(0) });
 +        }
 +
 +        CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
 +        long hits = metrics.hits.getCount();
 +        long requests = metrics.requests.getCount();
 +        assertEquals(0, hits);
 +        assertEquals(10, requests);
 +
 +        for (int i = 0; i < 100; i++)
 +        {
 +            assertRows(execute("SELECT col_text FROM %s WHERE part_key_a = ? AND part_key_b = ?", i, Integer.toOctalString(i)),
 +                       new Object[]{ String.valueOf(i) + '-' + String.valueOf(0) });
 +        }
 +
 +        hits = metrics.hits.getCount();
 +        requests = metrics.requests.getCount();
 +        assertEquals(10, hits);
 +        assertEquals(120, requests);
 +    }
 +
 +    @Test
 +    public void testKeyCacheClustered() throws Throwable
 +    {
 +        String table = createTable("CREATE TABLE %s ("
 +                                   + commonColumnsDef
 +                                   + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))");
 +        insertData(table, null, true);
 +        clearCache();
 +
 +        // query on partition key
 +
 +        // 10 queries, each 50 result rows
 +        for (int i = 0; i < 10; i++)
 +        {
 +            assertEquals(50, execute("SELECT col_text FROM %s WHERE part_key_a = ? AND part_key_b = ?", i, Integer.toOctalString(i)).size());
 +        }
 +
 +        CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
 +        long hits = metrics.hits.getCount();
 +        long requests = metrics.requests.getCount();
 +        assertEquals(0, hits);
 +        assertEquals(10, requests);
 +
 +        // 10 queries, each 50 result rows
 +        for (int i = 0; i < 10; i++)
 +        {
 +            assertEquals(50, execute("SELECT col_text FROM %s WHERE part_key_a = ? AND part_key_b = ?", i, Integer.toOctalString(i)).size());
 +        }
 +
 +        metrics = CacheService.instance.keyCache.getMetrics();
 +        hits = metrics.hits.getCount();
 +        requests = metrics.requests.getCount();
 +        assertEquals(10, hits);
 +        assertEquals(10 + 10, requests);
 +
 +        // 100 queries - must get a hit in key-cache
 +        for (int i = 0; i < 10; i++)
 +        {
 +            for (int c = 0; c < 10; c++)
 +            {
 +                assertRows(execute("SELECT col_text, col_long FROM %s WHERE part_key_a = ? AND part_key_b = ? and clust_key_a = ?", i, Integer.toOctalString(i), c),
 +                           new Object[]{ String.valueOf(i) + '-' + String.valueOf(c), (long) c });
 +            }
 +        }
 +
 +        metrics = CacheService.instance.keyCache.getMetrics();
 +        hits = metrics.hits.getCount();
 +        requests = metrics.requests.getCount();
 +        assertEquals(10 + 100, hits);
 +        assertEquals(20 + 100, requests);
 +
 +        // 5000 queries - first 10 partitions already in key cache
 +        for (int i = 0; i < 100; i++)
 +        {
 +            for (int c = 0; c < 50; c++)
 +            {
 +                assertRows(execute("SELECT col_text, col_long FROM %s WHERE part_key_a = ? AND part_key_b = ? and clust_key_a = ?", i, Integer.toOctalString(i), c),
 +                           new Object[]{ String.valueOf(i) + '-' + String.valueOf(c), (long) c });
 +            }
 +        }
 +
 +        hits = metrics.hits.getCount();
 +        requests = metrics.requests.getCount();
 +        assertEquals(110 + 4910, hits);
 +        assertEquals(120 + 5500, requests);
 +    }
 +
      // Inserts 100 partitions split over 10 sstables (flush after 10 partitions).
      // Clustered tables receive 50 CQL rows per partition.
      private void insertData(String table, String index, boolean withClustering) throws Throwable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/db/CounterCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/CounterCacheTest.java
index 517bce6,5b37b2c..65ec420
--- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
@@@ -28,10 -24,14 +28,11 @@@ import org.junit.BeforeClass
  import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
 -import org.apache.cassandra.config.KSMetaData;
 -import org.apache.cassandra.db.marshal.CounterColumnType;
 +import org.apache.cassandra.db.marshal.*;
  import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.exceptions.WriteTimeoutException;
 -import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.service.CacheService;
 -import org.apache.cassandra.utils.FBUtilities;
  
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertNull;
@@@ -68,42 -63,40 +69,44 @@@ public class CounterCacheTes
      @Test
      public void testReadWrite()
      {
 -        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
+         cfs.truncateBlocking();
          CacheService.instance.invalidateCounterCache();
  
 -        assertEquals(0, CacheService.instance.counterCache.size());
 -        assertNull(cfs.getCachedCounter(bytes(1), cellname(1)));
 -        assertNull(cfs.getCachedCounter(bytes(1), cellname(2)));
 -        assertNull(cfs.getCachedCounter(bytes(2), cellname(1)));
 -        assertNull(cfs.getCachedCounter(bytes(2), cellname(2)));
 +        Clustering c1 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(1)).build();
 +        Clustering c2 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(2)).build();
 +        ColumnDefinition cd = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("c"));
  
 -        cfs.putCachedCounter(bytes(1), cellname(1), ClockAndCount.create(1L, 1L));
 -        cfs.putCachedCounter(bytes(1), cellname(2), ClockAndCount.create(1L, 2L));
 -        cfs.putCachedCounter(bytes(2), cellname(1), ClockAndCount.create(2L, 1L));
 -        cfs.putCachedCounter(bytes(2), cellname(2), ClockAndCount.create(2L, 2L));
 -
 -        assertEquals(4, CacheService.instance.counterCache.size());
 -        assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1)));
 -        assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2)));
 -        assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), cellname(1)));
 -        assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), cellname(2)));
 +        assertEquals(0, CacheService.instance.counterCache.size());
 +        assertNull(cfs.getCachedCounter(bytes(1), c1, cd, null));
 +        assertNull(cfs.getCachedCounter(bytes(1), c2, cd, null));
 +        assertNull(cfs.getCachedCounter(bytes(2), c1, cd, null));
 +        assertNull(cfs.getCachedCounter(bytes(2), c2, cd, null));
 +
 +        cfs.putCachedCounter(bytes(1), c1, cd, null, ClockAndCount.create(1L, 1L));
 +        cfs.putCachedCounter(bytes(1), c2, cd, null, ClockAndCount.create(1L, 2L));
 +        cfs.putCachedCounter(bytes(2), c1, cd, null, ClockAndCount.create(2L, 1L));
 +        cfs.putCachedCounter(bytes(2), c2, cd, null, ClockAndCount.create(2L, 2L));
 +
 +        assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), c1, cd, null));
 +        assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), c2, cd, null));
 +        assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), c1, cd, null));
 +        assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), c2, cd, null));
      }
  
      @Test
      public void testSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
      {
 -        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
+         cfs.truncateBlocking();
          CacheService.instance.invalidateCounterCache();
  
 -        ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
 -        cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
 -        cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
 -        new CounterMutation(new Mutation(KEYSPACE1, bytes(1), cells), ConsistencyLevel.ONE).apply();
 -        new CounterMutation(new Mutation(KEYSPACE1, bytes(2), cells), ConsistencyLevel.ONE).apply();
 +        new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply();
 +        new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply();
 +        new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply();
 +        new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply();
 +
 +        assertEquals(4, CacheService.instance.counterCache.size());
  
          // flush the counter cache and invalidate
          CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
@@@ -111,16 -104,76 +114,79 @@@
          assertEquals(0, CacheService.instance.counterCache.size());
  
          // load from cache and validate
-         CacheService.instance.counterCache.loadSaved(cfs);
+         CacheService.instance.counterCache.loadSaved();
          assertEquals(4, CacheService.instance.counterCache.size());
 -        assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1)));
 -        assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2)));
 -        assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(2), cellname(1)));
 -        assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(2), cellname(2)));
 +
 +        Clustering c1 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(1)).build();
 +        Clustering c2 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(2)).build();
 +        ColumnDefinition cd = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("c"));
 +
 +        assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), c1, cd, null));
 +        assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), c2, cd, null));
 +        assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(2), c1, cd, null));
 +        assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(2), c2, cd, null));
      }
+ 
+     @Test
+     public void testDroppedSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
+     {
 -        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
++        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
+         cfs.truncateBlocking();
+         CacheService.instance.invalidateCounterCache();
+ 
 -        ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
 -        cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
 -        cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
 -        new CounterMutation(new Mutation(KEYSPACE1, bytes(1), cells), ConsistencyLevel.ONE).apply();
 -        new CounterMutation(new Mutation(KEYSPACE1, bytes(2), cells), ConsistencyLevel.ONE).apply();
++        new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply();
++        new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply();
++        new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply();
++        new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply();
+ 
+         // flush the counter cache and invalidate
+         CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
+         CacheService.instance.invalidateCounterCache();
+         assertEquals(0, CacheService.instance.counterCache.size());
+ 
+         Keyspace ks = Schema.instance.removeKeyspaceInstance(KEYSPACE1);
+ 
+         try
+         {
+             // load from cache and validate
+             CacheService.instance.counterCache.loadSaved();
+             assertEquals(0, CacheService.instance.counterCache.size());
+         }
+         finally
+         {
+             Schema.instance.storeKeyspaceInstance(ks);
+         }
+     }
+ 
+     @Test
+     public void testDisabledSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
+     {
 -        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
++        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
+         cfs.truncateBlocking();
+         CacheService.instance.invalidateCounterCache();
+ 
 -        ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
 -        cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
 -        cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
 -        new CounterMutation(new Mutation(KEYSPACE1, bytes(1), cells), ConsistencyLevel.ONE).apply();
 -        new CounterMutation(new Mutation(KEYSPACE1, bytes(2), cells), ConsistencyLevel.ONE).apply();
++        new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply();
++        new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(1)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply();
++        new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(1).add("c", 1L).build(), ConsistencyLevel.ONE).apply();
++        new CounterMutation(new RowUpdateBuilder(cfs.metadata, 0, bytes(2)).clustering(2).add("c", 2L).build(), ConsistencyLevel.ONE).apply();
+ 
+         // flush the counter cache and invalidate
+         CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
+         CacheService.instance.invalidateCounterCache();
+         assertEquals(0, CacheService.instance.counterCache.size());
+ 
+ 
+         CacheService.instance.setCounterCacheCapacityInMB(0);
+         try
+         {
+             // load from cache and validate
+             CacheService.instance.counterCache.loadSaved();
+             assertEquals(0, CacheService.instance.counterCache.size());
+         }
+         finally
+         {
+             CacheService.instance.setCounterCacheCapacityInMB(1);
+         }
+     }
+ 
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RowCacheTest.java
index be22b45,5912d7c..d407f7a
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@@ -71,53 -72,6 +71,53 @@@ public class RowCacheTes
      }
  
      @Test
 +    public void testRoundTrip() throws Exception
 +    {
 +        CompactionManager.instance.disableAutoCompaction();
 +
 +        Keyspace keyspace = Keyspace.open(KEYSPACE_CACHED);
 +        String cf = "CachedIntCF";
 +        ColumnFamilyStore cachedStore  = keyspace.getColumnFamilyStore(cf);
 +        long startRowCacheHits = cachedStore.metric.rowCacheHit.getCount();
 +        long startRowCacheOutOfRange = cachedStore.metric.rowCacheHitOutOfRange.getCount();
 +        // empty the row cache
 +        CacheService.instance.invalidateRowCache();
 +
 +        // set global row cache size to 1 MB
 +        CacheService.instance.setRowCacheCapacityInMB(1);
 +
 +        ByteBuffer key = ByteBufferUtil.bytes("rowcachekey");
 +        DecoratedKey dk = cachedStore.decorateKey(key);
-         RowCacheKey rck = new RowCacheKey(cachedStore.metadata.cfId, dk);
++        RowCacheKey rck = new RowCacheKey(cachedStore.metadata.ksAndCFName, dk);
 +
 +        RowUpdateBuilder rub = new RowUpdateBuilder(cachedStore.metadata, System.currentTimeMillis(), key);
 +        rub.clustering(String.valueOf(0));
 +        rub.add("val", ByteBufferUtil.bytes("val" + 0));
 +        rub.build().applyUnsafe();
 +
 +        // populate row cache, we should not get a row cache hit;
 +        Util.getAll(Util.cmd(cachedStore, dk).withLimit(1).build());
 +        assertEquals(startRowCacheHits, cachedStore.metric.rowCacheHit.getCount());
 +
 +        // do another query, limit is 20, which is < 100 that we cache, we should get a hit and it should be in range
 +        Util.getAll(Util.cmd(cachedStore, dk).withLimit(1).build());
 +        assertEquals(++startRowCacheHits, cachedStore.metric.rowCacheHit.getCount());
 +        assertEquals(startRowCacheOutOfRange, cachedStore.metric.rowCacheHitOutOfRange.getCount());
 +
 +        CachedPartition cachedCf = (CachedPartition)CacheService.instance.rowCache.get(rck);
 +        assertEquals(1, cachedCf.rowCount());
 +        for (Unfiltered unfiltered : Util.once(cachedCf.unfilteredIterator(ColumnFilter.selection(cachedCf.columns()), Slices.ALL, false)))
 +        {
 +            Row r = (Row) unfiltered;
 +            for (ColumnData c : r)
 +            {
 +                assertEquals(((Cell)c).value(), ByteBufferUtil.bytes("val" + 0));
 +            }
 +        }
 +        cachedStore.truncateBlocking();
 +    }
 +
 +    @Test
      public void testRowCache() throws Exception
      {
          CompactionManager.instance.disableAutoCompaction();
@@@ -254,21 -231,18 +289,21 @@@
          CacheService.instance.setRowCacheCapacityInMB(1);
  
          ByteBuffer key = ByteBufferUtil.bytes("rowcachekey");
 -        DecoratedKey dk = cachedStore.partitioner.decorateKey(key);
 +        DecoratedKey dk = cachedStore.decorateKey(key);
-         RowCacheKey rck = new RowCacheKey(cachedStore.metadata.cfId, dk);
+         RowCacheKey rck = new RowCacheKey(cachedStore.metadata.ksAndCFName, dk);
 -        Mutation mutation = new Mutation(KEYSPACE_CACHED, key);
 +        String values[] = new String[200];
          for (int i = 0; i < 200; i++)
 -            mutation.add(cf, Util.cellname(i), ByteBufferUtil.bytes("val" + i), System.currentTimeMillis());
 -        mutation.applyUnsafe();
 +        {
 +            RowUpdateBuilder rub = new RowUpdateBuilder(cachedStore.metadata, System.currentTimeMillis(), key);
 +            rub.clustering(String.valueOf(i));
 +            values[i] = "val" + i;
 +            rub.add("val", ByteBufferUtil.bytes(values[i]));
 +            rub.build().applyUnsafe();
 +        }
 +        Arrays.sort(values);
  
          // populate row cache, we should not get a row cache hit;
 -        cachedStore.getColumnFamily(QueryFilter.getSliceFilter(dk, cf,
 -                                                                Composites.EMPTY,
 -                                                                Composites.EMPTY,
 -                                                                false, 10, System.currentTimeMillis()));
 +        Util.getAll(Util.cmd(cachedStore, dk).withLimit(10).build());
          assertEquals(startRowCacheHits, cachedStore.metric.rowCacheHit.getCount());
  
          // do another query, limit is 20, which is < 100 that we cache, we should get a hit and it should be in range
@@@ -335,19 -309,6 +370,19 @@@
          // empty the cache again to make sure values came from disk
          CacheService.instance.invalidateRowCache();
          assertEquals(0, CacheService.instance.rowCache.size());
-         assertEquals(keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave, CacheService.instance.rowCache.loadSaved(store));
+         assertEquals(keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave, CacheService.instance.rowCache.loadSaved());
      }
 +
 +    private static void readData(String keyspace, String columnFamily, int offset, int numberOfRows)
 +    {
 +        ColumnFamilyStore store = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
 +        CFMetaData cfm = Schema.instance.getCFMetaData(keyspace, columnFamily);
 +
 +        for (int i = offset; i < offset + numberOfRows; i++)
 +        {
 +            DecoratedKey key = Util.dk("key" + i);
 +            Clustering cl = new Clustering(ByteBufferUtil.bytes("col" + i));
 +            Util.getAll(Util.cmd(store, key).build());
 +        }
 +    }
  }