You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2015/09/16 22:04:27 UTC

[2/2] cassandra git commit: 2i key cache load fails

2i key cache load fails

patch by Ariel Weisberg; reviewed by Robert Stupp for CASSANDRA-10155


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e889ee40
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e889ee40
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e889ee40

Branch: refs/heads/cassandra-2.1
Commit: e889ee408bec5330c312ff6b72a81a0012fdf2a5
Parents: 6479d94
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Wed Sep 16 21:57:54 2015 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Sep 16 21:57:54 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cache/AutoSavingCache.java | 172 +++++++-----
 .../org/apache/cassandra/cache/CacheKey.java    |  14 +-
 .../apache/cassandra/cache/CounterCacheKey.java |  26 +-
 .../org/apache/cassandra/cache/KeyCacheKey.java |  19 +-
 .../org/apache/cassandra/cache/RowCacheKey.java |  30 +--
 .../org/apache/cassandra/config/CFMetaData.java |   9 +
 .../cassandra/config/DatabaseDescriptor.java    |  15 +-
 .../org/apache/cassandra/config/Schema.java     |  56 +++-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  75 ++----
 src/java/org/apache/cassandra/db/Keyspace.java  |   4 -
 .../org/apache/cassandra/db/RowIndexEntry.java  |   2 +-
 .../db/index/SecondaryIndexManager.java         |  21 +-
 .../cassandra/io/sstable/SSTableReader.java     |   8 +-
 .../apache/cassandra/service/CacheService.java  |  58 ++--
 .../cassandra/service/CassandraDaemon.java      |  45 +++-
 .../cassandra/service/StorageService.java       |  31 ++-
 .../org/apache/cassandra/utils/FBUtilities.java |  16 ++
 .../cassandra/cache/AutoSavingCacheTest.java    |   5 +-
 .../cassandra/cache/CacheProviderTest.java      |  16 +-
 .../apache/cassandra/cql3/KeyCacheCqlTest.java  | 263 +++++++++++++++++++
 .../apache/cassandra/db/CounterCacheTest.java   |  70 ++++-
 .../org/apache/cassandra/db/KeyCacheTest.java   |   2 +-
 .../org/apache/cassandra/db/RowCacheTest.java   |  42 ++-
 24 files changed, 739 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2787739..207f16a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.10
+ * Fix cache handling of 2i and base tables (CASSANDRA-10155)
  * Fix NPE in nodetool compactionhistory (CASSANDRA-9758)
  * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410)
  * BATCH statement is broken in cqlsh (CASSANDRA-10272)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 98e3e59..3ebbc76 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.cache;
 
 import java.io.*;
 import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -27,6 +29,10 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -60,7 +66,15 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
     protected final CacheService.CacheType cacheType;
 
     private CacheSerializer<K, V> cacheLoader;
-    private static final String CURRENT_VERSION = "b";
+
+    /*
+     * CASSANDRA-10155 required a format change to fix 2i indexes and caching.
+     * 2.2 is already at version "c" and 3.0 is at "d".
+     *
+     * Since cache versions match exactly and there is no partial fallback just add
+     * a minor version letter.
+     */
+    private static final String CURRENT_VERSION = "ba";
 
     private static volatile IStreamFactory streamFactory = new IStreamFactory()
     {
@@ -88,16 +102,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
         this.cacheLoader = cacheloader;
     }
 
-    @Deprecated
-    public File getCachePath(String ksName, String cfName, UUID cfId, String version)
-    {
-        return DatabaseDescriptor.getSerializedCachePath(ksName, cfName, cfId, cacheType, version);
-    }
-
-    public File getCachePath(UUID cfId, String version)
+    public File getCachePath(String version)
     {
-        Pair<String, String> names = Schema.instance.getCF(cfId);
-        return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version);
+        return DatabaseDescriptor.getSerializedCachePath(cacheType, version);
     }
 
     public Writer getWriter(int keysToSave)
@@ -128,16 +135,42 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
         }
     }
 
-    public int loadSaved(ColumnFamilyStore cfs)
+    public ListenableFuture<Integer> loadSavedAsync()
+    {
+        final ListeningExecutorService es = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+        final long start = System.nanoTime();
+
+        ListenableFuture<Integer> cacheLoad = es.submit(new Callable<Integer>()
+        {
+            @Override
+            public Integer call() throws Exception
+            {
+                return loadSaved();
+            }
+        });
+        cacheLoad.addListener(new Runnable() {
+            @Override
+            public void run()
+            {
+                if (size() > 0)
+                    logger.info("Completed loading ({} ms; {} keys) {} cache",
+                            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
+                            CacheService.instance.keyCache.size(),
+                            cacheType);
+                es.shutdown();
+            }
+        }, MoreExecutors.sameThreadExecutor());
+
+        return cacheLoad;
+    }
+
+    public int loadSaved()
     {
         int count = 0;
         long start = System.nanoTime();
 
         // modern format, allows both key and value (so key cache load can be purely sequential)
-        File path = getCachePath(cfs.metadata.cfId, CURRENT_VERSION);
-        // if path does not exist, try without cfId (assuming saved cache is created with current CF)
-        if (!path.exists())
-            path = getCachePath(cfs.keyspace.getName(), cfs.name, null, CURRENT_VERSION);
+        File path = getCachePath(CURRENT_VERSION);
         if (path.exists())
         {
             DataInputStream in = null;
@@ -145,28 +178,57 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             {
                 logger.info(String.format("reading saved cache %s", path));
                 in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length()));
-                List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>();
+                ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<Future<Pair<K, V>>>();
+
                 while (in.available() > 0)
                 {
-                    Future<Pair<K, V>> entry = cacheLoader.deserialize(in, cfs);
+                    //ksname and cfname are serialized by the serializers in CacheService
+                    //That is delegated there because there are serializer specific conditions
+                    //where a cache key is skipped and not written
+                    String ksname = in.readUTF();
+                    String cfname = in.readUTF();
+
+                    ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(ksname, cfname));
+
+                    Future<Pair<K, V>> entryFuture = cacheLoader.deserialize(in, cfs);
                     // Key cache entry can return null, if the SSTable doesn't exist.
-                    if (entry == null)
+                    if (entryFuture == null)
                         continue;
-                    futures.add(entry);
+
+                    futures.offer(entryFuture);
                     count++;
+
+                    /*
+                     * Kind of unwise to accrue an unbounded number of pending futures
+                     * So now there is this loop to keep a bounded number pending.
+                     */
+                    do
+                    {
+                        while (futures.peek() != null && futures.peek().isDone())
+                        {
+                            Future<Pair<K, V>> future = futures.poll();
+                            Pair<K, V> entry = future.get();
+                            if (entry != null && entry.right != null)
+                                put(entry.left, entry.right);
+                        }
+
+                        if (futures.size() > 1000)
+                            Thread.yield();
+                    } while(futures.size() > 1000);
                 }
 
-                for (Future<Pair<K, V>> future : futures)
+                Future<Pair<K, V>> future = null;
+                while ((future = futures.poll()) != null)
                 {
                     Pair<K, V> entry = future.get();
                     if (entry != null && entry.right != null)
                         put(entry.left, entry.right);
                 }
             }
-            catch (Exception e)
+            catch (Throwable t)
             {
-                JVMStabilityInspector.inspectThrowable(e);
-                logger.debug(String.format("harmless error reading saved cache %s", path.getAbsolutePath()), e);
+                JVMStabilityInspector.inspectThrowable(t);
+                logger.info(String.format("Harmless error reading saved cache %s", path.getAbsolutePath()), t);
             }
             finally
             {
@@ -238,44 +300,33 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 
             long start = System.nanoTime();
 
-            HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
-            HashMap<UUID, OutputStream> streams = new HashMap<>();
-            HashMap<UUID, File> paths = new HashMap<>();
-
+            DataOutputStreamPlus writer = null;
+            File tempCacheFile = tempCacheFile();
             try
             {
+                try
+                {
+                    writer = new DataOutputStreamPlus(streamFactory.getOutputStream(tempCacheFile));
+                }
+                catch (FileNotFoundException e)
+                {
+                    throw new RuntimeException(e);
+                }
+
                 for (K key : keys)
                 {
-                    UUID cfId = key.getCFId();
-                    if (!Schema.instance.hasCF(key.getCFId()))
-                        continue; // the table has been dropped.
 
-                    DataOutputPlus writer = writers.get(cfId);
-                    if (writer == null)
-                    {
-                        File writerPath = tempCacheFile(cfId);
-                        OutputStream stream;
-                        try
-                        {
-                            stream = streamFactory.getOutputStream(writerPath);
-                            writer = new DataOutputStreamPlus(stream);
-                        }
-                        catch (FileNotFoundException e)
-                        {
-                            throw new RuntimeException(e);
-                        }
-                        paths.put(cfId, writerPath);
-                        streams.put(cfId, stream);
-                        writers.put(cfId, writer);
-                    }
+                    ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(key.ksAndCFName);
+                    if (cfs == null)
+                        continue; // the table or 2i has been dropped.
 
                     try
                     {
-                        cacheLoader.serialize(key, writer);
+                        cacheLoader.serialize(key, writer, cfs);
                     }
                     catch (IOException e)
                     {
-                        throw new FSWriteError(e, paths.get(cfId));
+                        throw new FSWriteError(e, tempCacheFile);
                     }
 
                     keysWritten++;
@@ -283,28 +334,23 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             }
             finally
             {
-                for (OutputStream writer : streams.values())
+                if (writer != null)
                     FileUtils.closeQuietly(writer);
             }
 
-            for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
-            {
-                UUID cfId = entry.getKey();
+            File cacheFile = getCachePath(CURRENT_VERSION);
 
-                File tmpFile = paths.get(cfId);
-                File cacheFile = getCachePath(cfId, CURRENT_VERSION);
+            cacheFile.delete(); // ignore error if it didn't exist
 
-                cacheFile.delete(); // ignore error if it didn't exist
-                if (!tmpFile.renameTo(cacheFile))
-                    logger.error("Unable to rename {} to {}", tmpFile, cacheFile);
-            }
+            if (!tempCacheFile.renameTo(cacheFile))
+                logger.error("Unable to rename {} to {}", tempCacheFile, cacheFile);
 
             logger.info("Saved {} ({} items) in {} ms", cacheType, keys.size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
         }
 
-        private File tempCacheFile(UUID cfId)
+        private File tempCacheFile()
         {
-            File path = getCachePath(cfId, CURRENT_VERSION);
+            File path = getCachePath(CURRENT_VERSION);
             return FileUtils.createTempFile(path.getName(), null, path.getParentFile());
         }
 
@@ -337,7 +383,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 
     public interface CacheSerializer<K extends CacheKey, V>
     {
-        void serialize(K key, DataOutputPlus out) throws IOException;
+        void serialize(K key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException;
 
         Future<Pair<K, V>> deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/cache/CacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/CacheKey.java b/src/java/org/apache/cassandra/cache/CacheKey.java
index 44fead0..0e82990 100644
--- a/src/java/org/apache/cassandra/cache/CacheKey.java
+++ b/src/java/org/apache/cassandra/cache/CacheKey.java
@@ -17,12 +17,14 @@
  */
 package org.apache.cassandra.cache;
 
-import java.util.UUID;
+import org.apache.cassandra.utils.Pair;
 
-public interface CacheKey extends IMeasurableMemory
+public abstract class CacheKey implements IMeasurableMemory
 {
-    /**
-     * @return The cf id of the cache key.
-     */
-    public UUID getCFId();
+    public final Pair<String, String> ksAndCFName;
+
+    public CacheKey(Pair<String, String> ksAndCFName)
+    {
+        this.ksAndCFName = ksAndCFName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/cache/CounterCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/CounterCacheKey.java b/src/java/org/apache/cassandra/cache/CounterCacheKey.java
index 60247c5..68856eb 100644
--- a/src/java/org/apache/cassandra/cache/CounterCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/CounterCacheKey.java
@@ -19,36 +19,28 @@ package org.apache.cassandra.cache;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.UUID;
 
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.utils.*;
 
-public class CounterCacheKey implements CacheKey
+public final class CounterCacheKey extends CacheKey
 {
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new CounterCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER, CellNames.simpleDense(ByteBuffer.allocate(1))))
-                                           + ObjectSizes.measure(new UUID(0, 0));
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new CounterCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER, CellNames.simpleDense(ByteBuffer.allocate(1))));
 
-    public final UUID cfId;
     public final byte[] partitionKey;
     public final byte[] cellName;
 
-    private CounterCacheKey(UUID cfId, ByteBuffer partitionKey, CellName cellName)
+    private CounterCacheKey(Pair<String, String> ksAndCFName, ByteBuffer partitionKey, CellName cellName)
     {
-        this.cfId = cfId;
+        super(ksAndCFName);
         this.partitionKey = ByteBufferUtil.getArray(partitionKey);
         this.cellName = ByteBufferUtil.getArray(cellName.toByteBuffer());
     }
 
-    public static CounterCacheKey create(UUID cfId, ByteBuffer partitionKey, CellName cellName)
+    public static CounterCacheKey create(Pair<String, String> ksAndCFName, ByteBuffer partitionKey, CellName cellName)
     {
-        return new CounterCacheKey(cfId, partitionKey, cellName);
-    }
-
-    public UUID getCFId()
-    {
-        return cfId;
+        return new CounterCacheKey(ksAndCFName, partitionKey, cellName);
     }
 
     public long unsharedHeapSize()
@@ -62,7 +54,7 @@ public class CounterCacheKey implements CacheKey
     public String toString()
     {
         return String.format("CounterCacheKey(%s, %s, %s)",
-                             cfId,
+                             ksAndCFName,
                              ByteBufferUtil.bytesToHex(ByteBuffer.wrap(partitionKey)),
                              ByteBufferUtil.bytesToHex(ByteBuffer.wrap(cellName)));
     }
@@ -70,7 +62,7 @@ public class CounterCacheKey implements CacheKey
     @Override
     public int hashCode()
     {
-        return Arrays.deepHashCode(new Object[]{cfId, partitionKey, cellName});
+        return Arrays.deepHashCode(new Object[]{ksAndCFName, partitionKey, cellName});
     }
 
     @Override
@@ -84,7 +76,7 @@ public class CounterCacheKey implements CacheKey
 
         CounterCacheKey cck = (CounterCacheKey) o;
 
-        return cfId.equals(cck.cfId)
+        return ksAndCFName.equals(cck.ksAndCFName)
             && Arrays.equals(partitionKey, cck.partitionKey)
             && Arrays.equals(cellName, cck.cellName);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/cache/KeyCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/KeyCacheKey.java b/src/java/org/apache/cassandra/cache/KeyCacheKey.java
index cef37ce..222622c 100644
--- a/src/java/org/apache/cassandra/cache/KeyCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/KeyCacheKey.java
@@ -19,15 +19,14 @@ package org.apache.cassandra.cache;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.UUID;
 
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.Pair;
 
-public class KeyCacheKey implements CacheKey
+public class KeyCacheKey extends CacheKey
 {
-    public final UUID cfId;
     public final Descriptor desc;
 
     private static final long EMPTY_SIZE = ObjectSizes.measure(new KeyCacheKey(null, null, ByteBufferUtil.EMPTY_BYTE_BUFFER));
@@ -36,19 +35,15 @@ public class KeyCacheKey implements CacheKey
     // without extra copies on lookup since client-provided key ByteBuffers will be array-backed already
     public final byte[] key;
 
-    public KeyCacheKey(UUID cfId, Descriptor desc, ByteBuffer key)
+    public KeyCacheKey(Pair<String, String> ksAndCFName, Descriptor desc, ByteBuffer key)
     {
-        this.cfId = cfId;
+
+        super(ksAndCFName);
         this.desc = desc;
         this.key = ByteBufferUtil.getArray(key);
         assert this.key != null;
     }
 
-    public UUID getCFId()
-    {
-        return cfId;
-    }
-
     public String toString()
     {
         return String.format("KeyCacheKey(%s, %s)", desc, ByteBufferUtil.bytesToHex(ByteBuffer.wrap(key)));
@@ -67,13 +62,13 @@ public class KeyCacheKey implements CacheKey
 
         KeyCacheKey that = (KeyCacheKey) o;
 
-        return cfId.equals(that.cfId) && desc.equals(that.desc) && Arrays.equals(key, that.key);
+        return ksAndCFName.equals(that.ksAndCFName) && desc.equals(that.desc) && Arrays.equals(key, that.key);
     }
 
     @Override
     public int hashCode()
     {
-        int result = cfId.hashCode();
+        int result = ksAndCFName.hashCode();
         result = 31 * result + desc.hashCode();
         result = 31 * result + Arrays.hashCode(key);
         return result;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/cache/RowCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/RowCacheKey.java b/src/java/org/apache/cassandra/cache/RowCacheKey.java
index af2d4d4..c959fd1 100644
--- a/src/java/org/apache/cassandra/cache/RowCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/RowCacheKey.java
@@ -19,37 +19,30 @@ package org.apache.cassandra.cache;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.UUID;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.Pair;
 
-public class RowCacheKey implements CacheKey, Comparable<RowCacheKey>
+public final class RowCacheKey extends CacheKey
 {
-    public final UUID cfId;
     public final byte[] key;
 
     private static final long EMPTY_SIZE = ObjectSizes.measure(new RowCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER));
 
-    public RowCacheKey(UUID cfId, DecoratedKey key)
+    public RowCacheKey(Pair<String, String> ksAndCFName, DecoratedKey key)
     {
-        this(cfId, key.getKey());
+        this(ksAndCFName, key.getKey());
     }
 
-    public RowCacheKey(UUID cfId, ByteBuffer key)
+    public RowCacheKey(Pair<String, String> ksAndCFName, ByteBuffer key)
     {
-        this.cfId = cfId;
+        super(ksAndCFName);
         this.key = ByteBufferUtil.getArray(key);
         assert this.key != null;
     }
 
-    public UUID getCFId()
-    {
-        return cfId;
-    }
-
     public long unsharedHeapSize()
     {
         return EMPTY_SIZE + ObjectSizes.sizeOfArray(key);
@@ -63,25 +56,20 @@ public class RowCacheKey implements CacheKey, Comparable<RowCacheKey>
 
         RowCacheKey that = (RowCacheKey) o;
 
-        return cfId.equals(that.cfId) && Arrays.equals(key, that.key);
+        return ksAndCFName.equals(that.ksAndCFName) && Arrays.equals(key, that.key);
     }
 
     @Override
     public int hashCode()
     {
-        int result = cfId.hashCode();
+        int result = ksAndCFName.hashCode();
         result = 31 * result + (key != null ? Arrays.hashCode(key) : 0);
         return result;
     }
 
-    public int compareTo(RowCacheKey otherKey)
-    {
-        return (cfId.compareTo(otherKey.cfId) < 0) ? -1 : ((cfId.equals(otherKey.cfId)) ?  FBUtilities.compareUnsigned(key, otherKey.key, 0, 0, key.length, otherKey.key.length) : 1);
-    }
-
     @Override
     public String toString()
     {
-        return String.format("RowCacheKey(cfId:%s, key:%s)", cfId, Arrays.toString(key));
+        return String.format("RowCacheKey(ksAndCFName:%s, key:%s)", ksAndCFName, Arrays.toString(key));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 2c6a30c..2939f09 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.thrift.CqlRow;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 import org.github.jamm.Unmetered;
 
@@ -385,6 +386,8 @@ public final class CFMetaData
     public final UUID cfId;                           // internal id, never exposed to user
     public final String ksName;                       // name of keyspace
     public final String cfName;                       // name of this column family
+    public final Pair<String, String> ksAndCFName;
+    public final byte[] ksAndCFBytes;
     public final ColumnFamilyType cfType;             // standard, super
     public volatile CellNameType comparator;          // bytes, long, timeuuid, utf8, etc.
 
@@ -475,6 +478,12 @@ public final class CFMetaData
         cfId = id;
         ksName = keyspace;
         cfName = name;
+        ksAndCFName = Pair.create(keyspace, name);
+        byte[] ksBytes = FBUtilities.toWriteUTFBytes(ksName);
+        byte[] cfBytes = FBUtilities.toWriteUTFBytes(cfName);
+        ksAndCFBytes = Arrays.copyOf(ksBytes, ksBytes.length + cfBytes.length);
+        System.arraycopy(cfBytes, 0, ksAndCFBytes, ksBytes.length, cfBytes.length);
+
         cfType = type;
         comparator = comp;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3a6a8fd..84381a0 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1437,16 +1437,11 @@ public class DatabaseDescriptor
         return conf.index_interval;
     }
 
-    public static File getSerializedCachePath(String ksName, String cfName, UUID cfId, CacheService.CacheType cacheType, String version)
-    {
-        StringBuilder builder = new StringBuilder();
-        builder.append(ksName).append('-');
-        builder.append(cfName).append('-');
-        if (cfId != null)
-            builder.append(ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(cfId))).append('-');
-        builder.append(cacheType);
-        builder.append((version == null ? "" : "-" + version + ".db"));
-        return new File(conf.saved_caches_directory, builder.toString());
+    public static File getSerializedCachePath(CacheService.CacheType cacheType, String version)
+    {
+        String name = cacheType.toString()
+                + (version == null ? "" : "-" + version + ".db");
+        return new File(conf.saved_caches_directory, name);
     }
 
     public static int getDynamicUpdateInterval()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 8e9802f..fada670 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.service.MigrationManager;
@@ -129,6 +130,53 @@ public class Schema
         return keyspaceInstances.get(keyspaceName);
     }
 
+    /**
+     * Retrieve a CFS by name even if that CFS is an index
+     *
+     * An index is identified by looking for '.' in the CF name and separating to find the base table
+     * containing the index
+     * @param ksNameAndCFName
+     * @return The named CFS or null if the keyspace, base table, or index don't exist
+     */
+    public ColumnFamilyStore getColumnFamilyStoreIncludingIndexes(Pair<String, String> ksNameAndCFName) {
+        String ksName = ksNameAndCFName.left;
+        String cfName = ksNameAndCFName.right;
+        Pair<String, String> baseTable;
+
+        /*
+         * Split does special case a one character regex, and it looks like it can detect
+         * if you use two characters to escape '.', but it still allocates a useless array.
+         */
+        int indexOfSeparator = cfName.indexOf('.');
+        if (indexOfSeparator > -1)
+            baseTable = Pair.create(ksName, cfName.substring(0, indexOfSeparator));
+        else
+            baseTable = ksNameAndCFName;
+
+        UUID cfId = cfIdMap.get(baseTable);
+        if (cfId == null)
+            return null;
+
+        Keyspace ks = keyspaceInstances.get(ksName);
+        if (ks == null)
+            return null;
+
+        ColumnFamilyStore baseCFS = ks.getColumnFamilyStore(cfId);
+
+        //Not an index
+        if (indexOfSeparator == -1)
+            return baseCFS;
+
+        if (baseCFS == null)
+            return null;
+
+        SecondaryIndex index = baseCFS.indexManager.getIndexByName(cfName);
+        if (index == null)
+            return null;
+
+        return index.getIndexCfs();
+    }
+
     public ColumnFamilyStore getColumnFamilyStoreInstance(UUID cfId)
     {
         Pair<String, String> pair = cfIdMap.inverse().get(cfId);
@@ -302,12 +350,12 @@ public class Schema
     }
 
     /**
-     * @param cfId The identifier of the ColumnFamily to lookup
-     * @return true if the CF id is a known one, false otherwise.
+     * @param ksAndCFName The identifier of the ColumnFamily to lookup
+     * @return true if the KS and CF pair is a known one, false otherwise.
      */
-    public boolean hasCF(UUID cfId)
+    public boolean hasCF(Pair<String, String> ksAndCFName)
     {
-        return cfIdMap.containsValue(cfId);
+        return cfIdMap.containsKey(ksAndCFName);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 25b3e57..ffaa276 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -358,8 +358,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         fileIndexGenerator.set(generation);
         sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2;
 
-        CachingOptions caching = metadata.getCaching();
-
         logger.info("Initializing {}.{}", keyspace.getName(), name);
 
         // scan for sstables corresponding to this cf and load them
@@ -372,9 +370,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             data.addInitialSSTables(sstables);
         }
 
-        if (caching.keyCache.isEnabled())
-            CacheService.instance.keyCache.loadSaved(this);
-
         // compaction strategy should be created after the CFS has been prepared
         this.compactionStrategyWrapper = new WrappingCompactionStrategy(this);
 
@@ -632,7 +627,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public static void removeUnfinishedCompactionLeftovers(CFMetaData metadata, Map<Integer, UUID> unfinishedCompactions)
     {
         Directories directories = new Directories(metadata);
-
         Set<Integer> allGenerations = new HashSet<>();
         for (Descriptor desc : directories.sstableLister().list().keySet())
             allGenerations.add(desc.generation);
@@ -702,39 +696,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    // must be called after all sstables are loaded since row cache merges all row versions
-    public void initRowCache()
-    {
-        if (!isRowCacheEnabled())
-            return;
-
-        long start = System.nanoTime();
-
-        int cachedRowsRead = CacheService.instance.rowCache.loadSaved(this);
-        if (cachedRowsRead > 0)
-            logger.info("Completed loading ({} ms; {} keys) row cache for {}.{}",
-                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
-                        cachedRowsRead,
-                        keyspace.getName(),
-                        name);
-    }
-
-    public void initCounterCache()
-    {
-        if (!metadata.isCounter() || CacheService.instance.counterCache.getCapacity() == 0)
-            return;
-
-        long start = System.nanoTime();
-
-        int cachedShardsRead = CacheService.instance.counterCache.loadSaved(this);
-        if (cachedShardsRead > 0)
-            logger.info("Completed loading ({} ms; {} shards) counter cache for {}.{}",
-                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
-                        cachedShardsRead,
-                        keyspace.getName(),
-                        name);
-    }
-
     /**
      * See #{@code StorageService.loadNewSSTables(String, String)} for more info
      *
@@ -1246,7 +1207,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         if (!isRowCacheEnabled())
             return;
 
-        RowCacheKey cacheKey = new RowCacheKey(metadata.cfId, key);
+        RowCacheKey cacheKey = new RowCacheKey(metadata.ksAndCFName, key);
         invalidateCachedRow(cacheKey);
     }
 
@@ -1696,7 +1657,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         assert isRowCacheEnabled()
                : String.format("Row cache is not enabled on column family [" + name + "]");
 
-        RowCacheKey key = new RowCacheKey(cfId, filter.key);
+        RowCacheKey key = new RowCacheKey(metadata.ksAndCFName, filter.key);
 
         // attempt a sentinel-read-cache sequence.  if a write invalidates our sentinel, we'll return our
         // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
@@ -2068,7 +2029,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
         {
             DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
-            if (key.cfId == metadata.cfId && !Range.isInRanges(dk.getToken(), ranges))
+            if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges))
                 invalidateCachedRow(dk);
         }
 
@@ -2077,7 +2038,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
             {
                 DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
-                if (key.cfId == metadata.cfId && !Range.isInRanges(dk.getToken(), ranges))
+                if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges))
                     CacheService.instance.counterCache.remove(key);
             }
         }
@@ -2527,16 +2488,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         if (!isRowCacheEnabled())
             return null;
 
-        IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new RowCacheKey(metadata.cfId, key));
+        IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new RowCacheKey(metadata.ksAndCFName, key));
         return cached == null || cached instanceof RowCacheSentinel ? null : (ColumnFamily)cached;
     }
 
     private void invalidateCaches()
     {
-        CacheService.instance.invalidateKeyCacheForCf(metadata.cfId);
-        CacheService.instance.invalidateRowCacheForCf(metadata.cfId);
+        CacheService.instance.invalidateKeyCacheForCf(metadata.ksAndCFName);
+        CacheService.instance.invalidateRowCacheForCf(metadata.ksAndCFName);
         if (metadata.isCounter())
-            CacheService.instance.invalidateCounterCacheForCf(metadata.cfId);
+            CacheService.instance.invalidateCounterCacheForCf(metadata.ksAndCFName);
     }
 
     /**
@@ -2544,7 +2505,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public boolean containsCachedRow(DecoratedKey key)
     {
-        return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.cfId, key));
+        return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.ksAndCFName, key));
     }
 
     public void invalidateCachedRow(RowCacheKey key)
@@ -2558,21 +2519,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         if (cfId == null)
             return; // secondary index
 
-        invalidateCachedRow(new RowCacheKey(cfId, key));
+        invalidateCachedRow(new RowCacheKey(metadata.ksAndCFName, key));
     }
 
     public ClockAndCount getCachedCounter(ByteBuffer partitionKey, CellName cellName)
     {
         if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
             return null;
-        return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.cfId, partitionKey, cellName));
+        return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, cellName));
     }
 
     public void putCachedCounter(ByteBuffer partitionKey, CellName cellName, ClockAndCount clockAndCount)
     {
         if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
             return;
-        CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.cfId, partitionKey, cellName), clockAndCount);
+        CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, cellName), clockAndCount);
     }
 
     public void forceMajorCompaction() throws InterruptedException, ExecutionException
@@ -3008,11 +2969,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.getCurrentMemtable() == view.getOldestMemtable();
     }
 
-    private boolean isRowCacheEnabled()
+    public boolean isRowCacheEnabled()
     {
         return metadata.getCaching().rowCache.isEnabled() && CacheService.instance.rowCache.getCapacity() > 0;
     }
 
+    public boolean isCounterCacheEnabled()
+    {
+        return metadata.isCounter() && CacheService.instance.counterCache.getCapacity() > 0;
+    }
+
+    public boolean isKeyCacheEnabled()
+    {
+        return metadata.getCaching().keyCache.isEnabled() && CacheService.instance.keyCache.getCapacity() > 0;
+    }
+
     /**
      * Discard all SSTables that were created before given timestamp.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 4f59c40..03c3d2b 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -120,10 +120,6 @@ public class Keyspace
                     // open and store the keyspace
                     keyspaceInstance = new Keyspace(keyspaceName, loadSSTables);
                     schema.storeKeyspaceInstance(keyspaceInstance);
-
-                    // keyspace has to be constructed and in the cache before cacheRow can be called
-                    for (ColumnFamilyStore cfs : keyspaceInstance.getColumnFamilyStores())
-                        cfs.initRowCache();
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index 01035c4..77b745c 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -142,7 +142,7 @@ public class RowIndexEntry implements IMeasurableMemory
             skipPromotedIndex(in);
         }
 
-        public static void skipPromotedIndex(DataInput in) throws IOException
+        private static void skipPromotedIndex(DataInput in) throws IOException
         {
             int size = in.readInt();
             if (size <= 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index a9ae069..ad3aae8 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -95,7 +95,8 @@ public class SecondaryIndexManager
     /**
      * Keeps all secondary index instances, either per-column or per-row
      */
-    private final Set<SecondaryIndex> allIndexes;
+    private final Collection<SecondaryIndex> allIndexes;
+    private final Map<String, SecondaryIndex> indexesByName;
 
 
     /**
@@ -107,7 +108,8 @@ public class SecondaryIndexManager
     {
         indexesByColumn = new ConcurrentSkipListMap<>();
         rowLevelIndexMap = new ConcurrentHashMap<>();
-        allIndexes = Collections.newSetFromMap(new ConcurrentHashMap<SecondaryIndex, Boolean>());
+        indexesByName = new ConcurrentHashMap<String, SecondaryIndex>();
+        allIndexes = indexesByName.values();
 
         this.baseCfs = baseCfs;
     }
@@ -158,7 +160,7 @@ public class SecondaryIndexManager
     {
         idxNames = filterByColumn(idxNames);
         if (idxNames.isEmpty())
-            return;        
+            return;
 
         logger.info(String.format("Submitting index build of %s for data in %s",
                                   idxNames, StringUtils.join(sstables, ", ")));
@@ -172,7 +174,7 @@ public class SecondaryIndexManager
         logger.info("Index build of {} complete", idxNames);
     }
 
-    public boolean indexes(CellName name, Set<SecondaryIndex> indexes)
+    public boolean indexes(CellName name, Collection<SecondaryIndex> indexes)
     {
         boolean matching = false;
         for (SecondaryIndex index : indexes)
@@ -186,7 +188,7 @@ public class SecondaryIndexManager
         return matching;
     }
 
-    public Set<SecondaryIndex> indexFor(CellName name, Set<SecondaryIndex> indexes)
+    public Set<SecondaryIndex> indexFor(CellName name, Collection<SecondaryIndex> indexes)
     {
         Set<SecondaryIndex> matching = null;
         for (SecondaryIndex index : indexes)
@@ -319,7 +321,7 @@ public class SecondaryIndexManager
         indexesByColumn.put(cdef.name.bytes, index);
 
         // Add to all indexes set:
-        allIndexes.add(index);
+        indexesByName.put(index.getIndexName(), index);
 
         // if we're just linking in the index to indexedColumns on an
         // already-built index post-restart, we're done
@@ -422,11 +424,16 @@ public class SecondaryIndexManager
     /**
      * @return all of the secondary indexes without distinction to the (non-)backed by secondary ColumnFamilyStore.
      */
-    public Set<SecondaryIndex> getIndexes()
+    public Collection<SecondaryIndex> getIndexes()
     {
         return allIndexes;
     }
 
+    public SecondaryIndex getIndexByName(String name)
+    {
+        return indexesByName.get(name);
+    }
+
     /**
      * @return if there are ANY indexes for this table..
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 86f6a23..0f307b0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1513,7 +1513,7 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
 
     public KeyCacheKey getCacheKey(DecoratedKey key)
     {
-        return new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
+        return new KeyCacheKey(metadata.ksAndCFName, descriptor, key.getKey());
     }
 
     public void cacheKey(DecoratedKey key, RowIndexEntry info)
@@ -1527,14 +1527,14 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
             return;
         }
 
-        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
+        KeyCacheKey cacheKey = new KeyCacheKey(metadata.ksAndCFName, descriptor, key.getKey());
         logger.trace("Adding cache entry for {} -> {}", cacheKey, info);
         keyCache.put(cacheKey, info);
     }
 
     public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
     {
-        return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.getKey()), updateStats);
+        return getCachedPosition(new KeyCacheKey(metadata.ksAndCFName, descriptor, key.getKey()), updateStats);
     }
 
     private RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats)
@@ -1596,7 +1596,7 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
         if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey))
         {
             DecoratedKey decoratedKey = (DecoratedKey)key;
-            KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, decoratedKey.getKey());
+            KeyCacheKey cacheKey = new KeyCacheKey(metadata.ksAndCFName, descriptor, decoratedKey.getKey());
             RowIndexEntry cachedPosition = getCachedPosition(cacheKey, updateCacheAndStats);
             if (cachedPosition != null)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index a43d6d5..50d8903 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -283,13 +282,13 @@ public class CacheService implements CacheServiceMBean
         keyCache.clear();
     }
 
-    public void invalidateKeyCacheForCf(UUID cfId)
+    public void invalidateKeyCacheForCf(Pair<String, String> ksAndCFName)
     {
         Iterator<KeyCacheKey> keyCacheIterator = keyCache.getKeySet().iterator();
         while (keyCacheIterator.hasNext())
         {
             KeyCacheKey key = keyCacheIterator.next();
-            if (key.cfId.equals(cfId))
+            if (key.ksAndCFName.equals(ksAndCFName))
                 keyCacheIterator.remove();
         }
     }
@@ -299,24 +298,24 @@ public class CacheService implements CacheServiceMBean
         rowCache.clear();
     }
 
-    public void invalidateRowCacheForCf(UUID cfId)
+    public void invalidateRowCacheForCf(Pair<String, String> ksAndCFName)
     {
         Iterator<RowCacheKey> rowCacheIterator = rowCache.getKeySet().iterator();
         while (rowCacheIterator.hasNext())
         {
             RowCacheKey rowCacheKey = rowCacheIterator.next();
-            if (rowCacheKey.cfId.equals(cfId))
+            if (rowCacheKey.ksAndCFName.equals(ksAndCFName))
                 rowCacheIterator.remove();
         }
     }
 
-    public void invalidateCounterCacheForCf(UUID cfId)
+    public void invalidateCounterCacheForCf(Pair<String, String> ksAndCFName)
     {
         Iterator<CounterCacheKey> counterCacheIterator = counterCache.getKeySet().iterator();
         while (counterCacheIterator.hasNext())
         {
             CounterCacheKey counterCacheKey = counterCacheIterator.next();
-            if (counterCacheKey.cfId.equals(cfId))
+            if (counterCacheKey.ksAndCFName.equals(ksAndCFName))
                 counterCacheIterator.remove();
         }
     }
@@ -405,16 +404,24 @@ public class CacheService implements CacheServiceMBean
 
     public static class CounterCacheSerializer implements CacheSerializer<CounterCacheKey, ClockAndCount>
     {
-        public void serialize(CounterCacheKey key, DataOutputPlus out) throws IOException
+        public void serialize(CounterCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException
         {
+            assert(cfs.metadata.isCounter());
+            out.write(cfs.metadata.ksAndCFBytes);
             ByteBufferUtil.writeWithLength(key.partitionKey, out);
             ByteBufferUtil.writeWithLength(key.cellName, out);
         }
 
         public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
         {
+            //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
+            //parameter so they aren't deserialized here, even though they are serialized by this serializer
             final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in);
-            final CellName cellName = cfs.metadata.comparator.cellFromByteBuffer(ByteBufferUtil.readWithLength(in));
+            ByteBuffer cellNameBuffer = ByteBufferUtil.readWithLength(in);
+            if (cfs == null || !cfs.metadata.isCounter() || !cfs.isCounterCacheEnabled())
+                return null;
+            assert(cfs.metadata.isCounter());
+            final CellName cellName = cfs.metadata.comparator.cellFromByteBuffer(cellNameBuffer);
             return StageManager.getStage(Stage.READ).submit(new Callable<Pair<CounterCacheKey, ClockAndCount>>()
             {
                 public Pair<CounterCacheKey, ClockAndCount> call() throws Exception
@@ -431,7 +438,7 @@ public class CacheService implements CacheServiceMBean
                     if (cell == null || !cell.isLive(Long.MIN_VALUE))
                         return null;
                     ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
-                    return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, cellName), clockAndCount);
+                    return Pair.create(CounterCacheKey.create(cfs.metadata.ksAndCFName, partitionKey, cellName), clockAndCount);
                 }
             });
         }
@@ -439,14 +446,22 @@ public class CacheService implements CacheServiceMBean
 
     public static class RowCacheSerializer implements CacheSerializer<RowCacheKey, IRowCacheEntry>
     {
-        public void serialize(RowCacheKey key, DataOutputPlus out) throws IOException
+        public void serialize(RowCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException
         {
+            assert(!cfs.isIndex());
+            out.write(cfs.metadata.ksAndCFBytes);
             ByteBufferUtil.writeWithLength(key.key, out);
         }
 
         public Future<Pair<RowCacheKey, IRowCacheEntry>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
         {
+            //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
+            //parameter so they aren't deserialized here, even though they are serialized by this serializer
             final ByteBuffer buffer = ByteBufferUtil.readWithLength(in);
+            if (cfs == null  || !cfs.isRowCacheEnabled())
+                return null;
+            assert(!cfs.isIndex());
+
             return StageManager.getStage(Stage.READ).submit(new Callable<Pair<RowCacheKey, IRowCacheEntry>>()
             {
                 public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception
@@ -454,7 +469,7 @@ public class CacheService implements CacheServiceMBean
                     DecoratedKey key = cfs.partitioner.decorateKey(buffer);
                     QueryFilter cacheFilter = new QueryFilter(key, cfs.getColumnFamilyName(), cfs.readFilterForCache(), Integer.MIN_VALUE);
                     ColumnFamily data = cfs.getTopLevelColumns(cacheFilter, Integer.MIN_VALUE);
-                    return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry) data);
+                    return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry) data);
                 }
             });
         }
@@ -462,24 +477,23 @@ public class CacheService implements CacheServiceMBean
 
     public static class KeyCacheSerializer implements CacheSerializer<KeyCacheKey, RowIndexEntry>
     {
-        public void serialize(KeyCacheKey key, DataOutputPlus out) throws IOException
+        public void serialize(KeyCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException
         {
             RowIndexEntry entry = CacheService.instance.keyCache.getInternal(key);
             if (entry == null)
                 return;
 
-            CFMetaData cfm = Schema.instance.getCFMetaData(key.cfId);
-            if (cfm == null)
-                return; // the table no longer exists.
-
+            out.write(cfs.metadata.ksAndCFBytes);
             ByteBufferUtil.writeWithLength(key.key, out);
             out.writeInt(key.desc.generation);
             out.writeBoolean(true);
-            cfm.comparator.rowIndexEntrySerializer().serialize(entry, out);
+            cfs.metadata.comparator.rowIndexEntrySerializer().serialize(entry, out);
         }
 
         public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException
         {
+            //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
+            //parameter so they aren't deserialized here, even though they are serialized by this serializer
             int keyLength = input.readInt();
             if (keyLength > FBUtilities.MAX_UNSIGNED_SHORT)
             {
@@ -488,15 +502,15 @@ public class CacheService implements CacheServiceMBean
             }
             ByteBuffer key = ByteBufferUtil.read(input, keyLength);
             int generation = input.readInt();
-            SSTableReader reader = findDesc(generation, cfs.getSSTables());
             input.readBoolean(); // backwards compatibility for "promoted indexes" boolean
-            if (reader == null)
+            SSTableReader reader = null;
+            if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = findDesc(generation, cfs.getSSTables())) == null)
             {
-                RowIndexEntry.Serializer.skipPromotedIndex(input);
+                RowIndexEntry.Serializer.skip(input);
                 return null;
             }
             RowIndexEntry entry = reader.metadata.comparator.rowIndexEntrySerializer().deserialize(input, reader.descriptor.version);
-            return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.cfId, reader.descriptor, key), entry));
+            return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry));
         }
 
         private SSTableReader findDesc(int generation, Collection<SSTableReader> collection)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index d078203..17553f3 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -25,8 +25,10 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.rmi.registry.LocateRegistry;
 import java.rmi.server.RMIServerSocketFactory;
+import java.util.List;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
@@ -36,6 +38,8 @@ import javax.management.remote.rmi.RMIConnectorServer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -99,7 +103,7 @@ public class CassandraDaemon
                     url.append("service:jmx:");
                     url.append("rmi://localhost/jndi/");
                     url.append("rmi://localhost:").append(jmxPort).append("/jmxrmi");
-                    
+
                     Map env = new HashMap();
                     env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE, serverFactory);
 
@@ -144,7 +148,7 @@ public class CassandraDaemon
      */
     protected void setup()
     {
-        try 
+        try
         {
             logger.info("Hostname: {}", InetAddress.getLocalHost().getHostName());
         }
@@ -330,11 +334,16 @@ public class CassandraDaemon
             }
         }
 
-        if (CacheService.instance.keyCache.size() > 0)
-            logger.info("completed pre-loading ({} keys) key cache.", CacheService.instance.keyCache.size());
 
-        if (CacheService.instance.rowCache.size() > 0)
-            logger.info("completed pre-loading ({} keys) row cache.", CacheService.instance.rowCache.size());
+        try
+        {
+            loadRowAndKeyCacheAsync().get();
+        }
+        catch (Throwable t)
+        {
+            JVMStabilityInspector.inspectThrowable(t);
+            logger.warn("Error loading key or row cache", t);
+        }
 
         try
         {
@@ -429,6 +438,22 @@ public class CassandraDaemon
         completeSetup();
     }
 
+    /*
+     * Asynchronously load the row and key cache in one off threads and return a compound future of the result.
+     * Error handling is pushed into the cache load since cache loads are allowed to fail and are handled by logging.
+     */
+    private ListenableFuture<?> loadRowAndKeyCacheAsync()
+    {
+        final ListenableFuture<Integer> keyCacheLoad = CacheService.instance.keyCache.loadSavedAsync();
+
+        final ListenableFuture<Integer> rowCacheLoad = CacheService.instance.rowCache.loadSavedAsync();
+
+        @SuppressWarnings("unchecked")
+        ListenableFuture<List<Integer>> retval = Futures.successfulAsList(keyCacheLoad, rowCacheLoad);
+
+        return retval;
+    }
+
     @VisibleForTesting
     public void completeSetup()
     {
@@ -533,7 +558,7 @@ public class CassandraDaemon
                 logger.error("error registering MBean {}", MBEAN_NAME, e);
                 //Allow the server to start even if the bean can't be registered
             }
-            
+
             setup();
 
             if (pidFile != null)
@@ -625,15 +650,15 @@ public class CassandraDaemon
     {
         instance.activate();
     }
-    
+
     static class NativeAccess implements NativeAccessMBean
     {
         public boolean isAvailable()
         {
             return CLibrary.jnaAvailable();
         }
-        
-        public boolean isMemoryLockable() 
+
+        public boolean isMemoryLockable()
         {
             return CLibrary.jnaMemoryLockable();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index f5950e3..431f163 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -719,10 +719,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         prepareToJoin();
 
-        // Has to be called after the host id has potentially changed in prepareToJoin().
-        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-            if (cfs.metadata.isCounter())
-                cfs.initCounterCache();
+        try
+        {
+            CacheService.instance.counterCache.loadSavedAsync().get();
+        }
+        catch (Throwable t)
+        {
+            JVMStabilityInspector.inspectThrowable(t);
+            logger.warn("Error loading counter cache", t);
+        }
 
         if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))
         {
@@ -2443,8 +2448,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     /**
      * Takes the snapshot of a multiple column family from different keyspaces. A snapshot name must be specified.
-     * 
-     * 
+     *
+     *
      * @param tag
      *            the tag given to the snapshot; may not be null or empty
      * @param columnFamilyList
@@ -3817,7 +3822,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     public synchronized void drain() throws IOException, InterruptedException, ExecutionException
     {
         inShutdownHook = true;
-        
+
         ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
         ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
         if (mutationStage.isTerminated() && counterMutationStage.isTerminated())
@@ -3947,32 +3952,32 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      */
     public LinkedHashMap<InetAddress, Float> effectiveOwnership(String keyspace) throws IllegalStateException
     {
-    	
+
     	if (keyspace != null)
     	{
     		Keyspace keyspaceInstance = Schema.instance.getKeyspaceInstance(keyspace);
 			if(keyspaceInstance == null)
 				throw new IllegalArgumentException("The keyspace " + keyspace + ", does not exist");
-    		
+
     		if(keyspaceInstance.getReplicationStrategy() instanceof LocalStrategy)
 				throw new IllegalStateException("Ownership values for keyspaces with LocalStrategy are meaningless");
     	}
     	else
     	{
         	List<String> nonSystemKeyspaces = Schema.instance.getNonSystemKeyspaces();
-        	
+
         	//system_traces is a non-system keyspace however it needs to be counted as one for this process
         	int specialTableCount = 0;
         	if (nonSystemKeyspaces.contains("system_traces"))
 			{
         		specialTableCount += 1;
 			}
-        	if (nonSystemKeyspaces.size() > specialTableCount) 	   		
+        	if (nonSystemKeyspaces.size() > specialTableCount)
         		throw new IllegalStateException("Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless");
-        	
+
         	keyspace = "system_traces";
     	}
-    	
+
         TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap();
 
         Collection<Collection<InetAddress>> endpointsGroupedByDc = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index f866610..8d8dd22 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -713,4 +713,20 @@ public class FBUtilities
         digest.update((byte) ((val >>>  8) & 0xFF));
         digest.update((byte)  ((val >>> 0) & 0xFF));
     }
+
+    public static byte[] toWriteUTFBytes(String s)
+    {
+        try
+        {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(baos);
+            dos.writeUTF(s);
+            dos.flush();
+            return baos.toByteArray();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
index 28afef1..e6ef69e 100644
--- a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
+++ b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
@@ -59,9 +59,8 @@ public class AutoSavingCacheTest extends SchemaLoader
         Assert.assertEquals(0, keyCache.size());
 
         // then load saved
-        keyCache.loadSaved(cfs);
-        Assert.assertEquals(2, keyCache.size());
+        keyCache.loadSavedAsync().get();
         for (SSTableReader sstable : cfs.getSSTables())
-            Assert.assertNotNull(keyCache.get(new KeyCacheKey(cfs.metadata.cfId, sstable.descriptor, ByteBufferUtil.bytes("key1"))));
+            Assert.assertNotNull(keyCache.get(new KeyCacheKey(cfs.metadata.ksAndCFName, sstable.descriptor, ByteBufferUtil.bytes("key1"))));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index 71d4f80..63f89a4 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -1,4 +1,3 @@
-package org.apache.cassandra.cache;
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.cassandra.cache;
  * under the License.
  *
  */
+package org.apache.cassandra.cache;
 
 
 import java.nio.ByteBuffer;
@@ -31,6 +31,7 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.db.ArrayBackedSortedColumns;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.utils.Pair;
 
 import com.googlecode.concurrentlinkedhashmap.Weighers;
 
@@ -114,21 +115,20 @@ public class CacheProviderTest extends SchemaLoader
         simpleCase(cf, cache);
         concurrentCase(cf, cache);
     }
-    
+
     @Test
     public void testKeys()
     {
-        UUID cfId = UUID.randomUUID();
-
+        Pair<String, String> ksAndCFName = Pair.create(keyspaceName, cfName);
         byte[] b1 = {1, 2, 3, 4};
-        RowCacheKey key1 = new RowCacheKey(cfId, ByteBuffer.wrap(b1));
+        RowCacheKey key1 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b1));
         byte[] b2 = {1, 2, 3, 4};
-        RowCacheKey key2 = new RowCacheKey(cfId, ByteBuffer.wrap(b2));
+        RowCacheKey key2 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b2));
         assertEquals(key1, key2);
         assertEquals(key1.hashCode(), key2.hashCode());
-        
+
         byte[] b3 = {1, 2, 3, 5};
-        RowCacheKey key3 = new RowCacheKey(cfId, ByteBuffer.wrap(b3));
+        RowCacheKey key3 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b3));
         assertNotSame(key1, key3);
         assertNotSame(key1.hashCode(), key3.hashCode());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
new file mode 100644
index 0000000..0e879e9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.MetricName;
+
+import org.apache.cassandra.cache.KeyCacheKey;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.metrics.CacheMetrics;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KeyCacheCqlTest extends CQLTester
+{
+
+    static final String commonColumnsDef =
+    "part_key_a     int," +
+    "part_key_b     text," +
+    "clust_key_a    int," +
+    "clust_key_b    text," +
+    "clust_key_c    frozen<list<text>>," + // to make it really big
+    "col_text       text," +
+    "col_int        int," +
+    "col_long       bigint,";
+    static final String commonColumns =
+    "part_key_a," +
+    "part_key_b," +
+    "clust_key_a," +
+    "clust_key_b," +
+    "clust_key_c," + // to make it really big
+    "col_text," +
+    "col_int," +
+    "col_long";
+
+    @Test
+    public void test2iKeyCachePaths() throws Throwable
+    {
+        String table = createTable("CREATE TABLE %s ("
+                                   + commonColumnsDef
+                                   + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))");
+        createIndex("CREATE INDEX some_index ON %s (col_int)");
+        insertData(table, "some_index", true);
+        clearCache();
+
+        CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+
+        for (int i = 0; i < 10; i++)
+        {
+            UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+            assertEquals(500, result.size());
+        }
+
+        long hits = metrics.hits.count();
+        long requests = metrics.requests.count();
+        assertEquals(4900, hits);
+        assertEquals(5250, requests);
+
+        //
+
+        for (int i = 0; i < 10; i++)
+        {
+            UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+            // 100 part-keys * 50 clust-keys
+            // indexed on part-key % 10 = 10 index partitions
+            // (50 clust-keys  *  100-part-keys  /  10 possible index-values) = 500
+            assertEquals(500, result.size());
+        }
+
+        metrics = CacheService.instance.keyCache.getMetrics();
+        hits = metrics.hits.count();
+        requests = metrics.requests.count();
+        assertEquals(10000, hits);
+        assertEquals(10500, requests);
+
+        CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
+
+        int beforeSize = CacheService.instance.keyCache.size();
+
+        CacheService.instance.keyCache.clear();
+
+        Assert.assertEquals(0, CacheService.instance.keyCache.size());
+
+        // then load saved
+        CacheService.instance.keyCache.loadSaved();
+
+        assertEquals(beforeSize, CacheService.instance.keyCache.size());
+
+        for (int i = 0; i < 10; i++)
+        {
+            UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+            // 100 part-keys * 50 clust-keys
+            // indexed on part-key % 10 = 10 index partitions
+            // (50 clust-keys  *  100-part-keys  /  10 possible index-values) = 500
+            assertEquals(500, result.size());
+        }
+
+        //Test Schema.getColumnFamilyStoreIncludingIndexes, several null check paths
+        //are defensive and unreachable
+        assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create("foo", "bar")));
+        assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(KEYSPACE, "bar")));
+
+        dropTable("DROP TABLE %s");
+
+        //Test loading for a dropped 2i/table
+        CacheService.instance.keyCache.clear();
+
+        // then load saved
+        CacheService.instance.keyCache.loadSaved();
+
+        assertEquals(0, CacheService.instance.keyCache.size());
+    }
+
+    @Test
+    public void test2iKeyCachePathsSaveKeysForDroppedTable() throws Throwable
+    {
+        String table = createTable("CREATE TABLE %s ("
+                                   + commonColumnsDef
+                                   + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))");
+        createIndex("CREATE INDEX some_index ON %s (col_int)");
+        insertData(table, "some_index", true);
+        clearCache();
+
+        CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+
+        for (int i = 0; i < 10; i++)
+        {
+            UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+            assertEquals(500, result.size());
+        }
+
+        long hits = metrics.hits.count();
+        long requests = metrics.requests.count();
+        assertEquals(4900, hits);
+        assertEquals(5250, requests);
+
+        //
+
+        for (int i = 0; i < 10; i++)
+        {
+            UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+            // 100 part-keys * 50 clust-keys
+            // indexed on part-key % 10 = 10 index partitions
+            // (50 clust-keys  *  100-part-keys  /  10 possible index-values) = 500
+            assertEquals(500, result.size());
+        }
+
+        metrics = CacheService.instance.keyCache.getMetrics();
+        hits = metrics.hits.count();
+        requests = metrics.requests.count();
+        assertEquals(10000, hits);
+        assertEquals(10500, requests);
+
+        dropTable("DROP TABLE %s");
+
+        CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
+
+        CacheService.instance.keyCache.clear();
+
+        Assert.assertEquals(0, CacheService.instance.keyCache.size());
+
+        // then load saved
+        CacheService.instance.keyCache.loadSaved();
+
+        for (KeyCacheKey key : CacheService.instance.keyCache.getKeySet())
+        {
+            Assert.assertFalse(key.ksAndCFName.left.equals("KEYSPACE"));
+            Assert.assertFalse(key.ksAndCFName.right.startsWith(table));
+        }
+    }
+
+    // Inserts 100 partitions split over 10 sstables (flush after 10 partitions).
+    // Clustered tables receive 50 CQL rows per partition.
+    private void insertData(String table, String index, boolean withClustering) throws Throwable
+    {
+        StorageService.instance.disableAutoCompaction(KEYSPACE, table);
+        Keyspace.open(KEYSPACE).getColumnFamilyStore(table).forceFlush().get();
+        Keyspace.open(KEYSPACE).getColumnFamilyStore(table).truncateBlocking();
+        if (index != null)
+        {
+            StorageService.instance.disableAutoCompaction(KEYSPACE, table + '.' + index);
+            Keyspace.open(KEYSPACE).getColumnFamilyStore(table).indexManager.getIndexesByNames(ImmutableSet.of(table + "." + index)).iterator().next().forceBlockingFlush();
+        }
+
+        for (int i = 0; i < 100; i++)
+        {
+            int partKeyA = i;
+            String partKeyB = Integer.toOctalString(i);
+            for (int c = 0; c < (withClustering ? 50 : 1); c++)
+            {
+                int clustKeyA = c;
+                String clustKeyB = Integer.toOctalString(c);
+                List<String> clustKeyC = makeList(clustKeyB);
+                String colText = String.valueOf(i) + '-' + String.valueOf(c);
+                int colInt = i % 10;
+                long colLong = c;
+                execute("INSERT INTO %s (" + commonColumns + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
+                        partKeyA, partKeyB,
+                        clustKeyA, clustKeyB, clustKeyC,
+                        colText, colInt, colLong);
+            }
+
+            if (i % 10 == 9)
+            {
+                Keyspace.open(KEYSPACE).getColumnFamilyStore(table).forceFlush().get();
+                if (index != null)
+                    Keyspace.open(KEYSPACE).getColumnFamilyStore(table).indexManager.getIndexesByNames(ImmutableSet.of(table + "." + index)).iterator().next().forceBlockingFlush();
+            }
+        }
+    }
+
+    private static List<String> makeList(String value)
+    {
+        List<String> list = new ArrayList<>(50);
+        for (int i = 0; i < 50; i++)
+        {
+            list.add(value + i);
+        }
+        return list;
+    }
+
+    private static void clearCache()
+    {
+        for (MetricName name : ImmutableSet.copyOf(Metrics.defaultRegistry().allMetrics().keySet()))
+        {
+            Metrics.defaultRegistry().removeMetric(name);
+        }
+        CacheService.instance.keyCache.clear();
+        CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+        Assert.assertEquals(0, metrics.entries.value().intValue());
+        Assert.assertEquals(0L, metrics.hits.count());
+        Assert.assertEquals(0L, metrics.requests.count());
+        Assert.assertEquals(0L, metrics.size.value().longValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/db/CounterCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterCacheTest.java b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
index cb2d97a..20e067c 100644
--- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
@@ -23,6 +23,7 @@ import org.junit.AfterClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -48,6 +49,7 @@ public class CounterCacheTest extends SchemaLoader
     public void testReadWrite()
     {
         ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
         CacheService.instance.invalidateCounterCache();
 
         assertEquals(0, CacheService.instance.counterCache.size());
@@ -72,6 +74,7 @@ public class CounterCacheTest extends SchemaLoader
     public void testSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
     {
         ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
         CacheService.instance.invalidateCounterCache();
 
         ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
@@ -86,11 +89,76 @@ public class CounterCacheTest extends SchemaLoader
         assertEquals(0, CacheService.instance.counterCache.size());
 
         // load from cache and validate
-        CacheService.instance.counterCache.loadSaved(cfs);
+        CacheService.instance.counterCache.loadSaved();
         assertEquals(4, CacheService.instance.counterCache.size());
         assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1)));
         assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2)));
         assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(2), cellname(1)));
         assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(2), cellname(2)));
     }
+
+    @Test
+    public void testDroppedSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        CacheService.instance.invalidateCounterCache();
+
+        ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
+        cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+        new CounterMutation(new Mutation(KS, bytes(2), cells), ConsistencyLevel.ONE).apply();
+
+        // flush the counter cache and invalidate
+        CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
+        CacheService.instance.invalidateCounterCache();
+        assertEquals(0, CacheService.instance.counterCache.size());
+
+        Keyspace ks = Schema.instance.removeKeyspaceInstance(KS);
+
+        try
+        {
+            // load from cache and validate
+            CacheService.instance.counterCache.loadSaved();
+            assertEquals(0, CacheService.instance.counterCache.size());
+        }
+        finally
+        {
+            Schema.instance.storeKeyspaceInstance(ks);
+        }
+    }
+
+    @Test
+    public void testDisabledSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        CacheService.instance.invalidateCounterCache();
+
+        ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
+        cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+        new CounterMutation(new Mutation(KS, bytes(2), cells), ConsistencyLevel.ONE).apply();
+
+        // flush the counter cache and invalidate
+        CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
+        CacheService.instance.invalidateCounterCache();
+        assertEquals(0, CacheService.instance.counterCache.size());
+
+
+        CacheService.instance.setCounterCacheCapacityInMB(0);
+        try
+        {
+            // load from cache and validate
+            CacheService.instance.counterCache.loadSaved();
+            assertEquals(0, CacheService.instance.counterCache.size());
+        }
+        finally
+        {
+            CacheService.instance.setCounterCacheCapacityInMB(1);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index 1f7024e..4a4c7d5 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -86,7 +86,7 @@ public class KeyCacheTest extends SchemaLoader
         CacheService.instance.invalidateKeyCache();
         assertKeyCacheSize(0, KEYSPACE1, COLUMN_FAMILY2);
 
-        CacheService.instance.keyCache.loadSaved(store);
+        CacheService.instance.keyCache.loadSaved();
         assertKeyCacheSize(savedMap.size(), KEYSPACE1, COLUMN_FAMILY2);
 
         // probably it's better to add equals/hashCode to RowIndexEntry...