You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2015/09/16 22:04:42 UTC

[1/7] cassandra git commit: bump versions for 2.0.17

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 8b2dc1f25 -> e63dacf79


bump versions for 2.0.17


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3aff4491
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3aff4491
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3aff4491

Branch: refs/heads/cassandra-2.2
Commit: 3aff44915edbd2bf07955d5b30fd47bf9c4698da
Parents: ae4cd69
Author: T Jake Luciani <ja...@apache.org>
Authored: Wed Sep 16 13:39:39 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Wed Sep 16 13:39:39 2015 -0400

----------------------------------------------------------------------
 NEWS.txt         | 8 ++++++++
 build.xml        | 2 +-
 debian/changelog | 6 ++++++
 3 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aff4491/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 9bfe803..4b3fdf4 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -13,6 +13,14 @@ restore snapshots created with the previous major version using the
 'sstableloader' tool. You can upgrade the file format of your snapshots
 using the provided 'sstableupgrade' tool.
 
+2.0.17
+======
+
+Upgrading
+---------
+    - Nothing specific to this release, but refer to previous entries if you
+      are upgrading from a previous version.
+
 2.0.16
 ======
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aff4491/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index b019ec3..85b5ea0 100644
--- a/build.xml
+++ b/build.xml
@@ -25,7 +25,7 @@
     <property name="debuglevel" value="source,lines,vars"/>
 
     <!-- default version and SCM information -->
-    <property name="base.version" value="2.0.16"/>
+    <property name="base.version" value="2.0.17"/>
     <property name="scm.connection" value="scm:git://git.apache.org/cassandra.git"/>
     <property name="scm.developerConnection" value="scm:git://git.apache.org/cassandra.git"/>
     <property name="scm.url" value="http://git-wip-us.apache.org/repos/asf?p=cassandra.git;a=tree"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aff4491/debian/changelog
----------------------------------------------------------------------
diff --git a/debian/changelog b/debian/changelog
index 527eb29..de1883f 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,9 @@
+cassandra (2.0.17) unstable; urgency=medium
+
+  * New release
+
+ -- Jake Luciani <ja...@apache.org>  Wed, 16 Sep 2015 13:39:00 -0400
+
 cassandra (2.0.16) unstable; urgency=medium
 
   * New release 


[3/7] cassandra git commit: 2i key cache load fails

Posted by sn...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index 7b6ff99..6d4554d 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.cache.RowCacheKey;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -37,6 +38,7 @@ import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class RowCacheTest extends SchemaLoader
 {
@@ -156,6 +158,42 @@ public class RowCacheTest extends SchemaLoader
         rowCacheLoad(100, 50, 0);
         CacheService.instance.setRowCacheCapacityInMB(0);
     }
+
+    @Test
+    public void testRowCacheDropSaveLoad() throws Exception
+    {
+        CacheService.instance.setRowCacheCapacityInMB(1);
+        rowCacheLoad(100, 50, 0);
+        CacheService.instance.rowCache.submitWrite(Integer.MAX_VALUE).get();
+        Keyspace instance = Schema.instance.removeKeyspaceInstance(KEYSPACE);
+        try
+        {
+            CacheService.instance.rowCache.size();
+            CacheService.instance.rowCache.clear();
+            CacheService.instance.rowCache.loadSaved();
+            int after = CacheService.instance.rowCache.size();
+            assertEquals(0, after);
+        }
+        finally
+        {
+            Schema.instance.storeKeyspaceInstance(instance);
+        }
+    }
+
+    @Test
+    public void testRowCacheDisabled() throws Exception
+    {
+        CacheService.instance.setRowCacheCapacityInMB(1);
+        rowCacheLoad(100, 50, 0);
+        CacheService.instance.rowCache.submitWrite(Integer.MAX_VALUE).get();
+        CacheService.instance.setRowCacheCapacityInMB(0);
+        CacheService.instance.rowCache.size();
+        CacheService.instance.rowCache.clear();
+        CacheService.instance.rowCache.loadSaved();
+        int after = CacheService.instance.rowCache.size();
+        assertEquals(0, after);
+    }
+
     @Test
     public void testRowCacheRange()
     {
@@ -174,7 +212,7 @@ public class RowCacheTest extends SchemaLoader
 
         ByteBuffer key = ByteBufferUtil.bytes("rowcachekey");
         DecoratedKey dk = cachedStore.partitioner.decorateKey(key);
-        RowCacheKey rck = new RowCacheKey(cachedStore.metadata.cfId, dk);
+        RowCacheKey rck = new RowCacheKey(cachedStore.metadata.ksAndCFName, dk);
         Mutation mutation = new Mutation(KEYSPACE, key);
         for (int i = 0; i < 200; i++)
             mutation.add(cf, Util.cellname(i), ByteBufferUtil.bytes("val" + i), System.currentTimeMillis());
@@ -251,6 +289,6 @@ public class RowCacheTest extends SchemaLoader
         // empty the cache again to make sure values came from disk
         CacheService.instance.invalidateRowCache();
         assert CacheService.instance.rowCache.size() == 0;
-        assert CacheService.instance.rowCache.loadSaved(store) == (keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave);
+        assert CacheService.instance.rowCache.loadSaved() == (keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave);
     }
 }


[5/7] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by sn...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index f427389,0000000..5c51fbb
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@@ -1,261 -1,0 +1,261 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import com.google.common.util.concurrent.RateLimiter;
 +import org.apache.cassandra.cache.KeyCacheKey;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.RowIndexEntry;
 +import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +/**
 + * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
 + * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
 + */
 +public class BigTableReader extends SSTableReader
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class);
 +
 +    BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason)
 +    {
 +        super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +    }
 +
 +    public OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns)
 +    {
 +        return new SSTableNamesIterator(this, key, columns);
 +    }
 +
 +    public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry )
 +    {
 +        return new SSTableNamesIterator(this, input, key, columns, indexEntry);
 +    }
 +
 +    public OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse)
 +    {
 +        return new SSTableSliceIterator(this, key, slices, reverse);
 +    }
 +
 +    public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, ColumnSlice[] slices, boolean reverse, RowIndexEntry indexEntry)
 +    {
 +        return new SSTableSliceIterator(this, input, key, slices, reverse, indexEntry);
 +    }
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter)
 +    {
 +        return BigTableScanner.getScanner(this, dataRange, limiter);
 +    }
 +
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
 +     *
 +     * @param ranges the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter)
 +    {
 +        return BigTableScanner.getScanner(this, ranges, limiter);
 +    }
 +
 +
 +    /**
 +     * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
 +     * allow key selection by token bounds but only if op != * EQ
 +     * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
 +     * @param updateCacheAndStats true if updating stats and cache
 +     * @return The index entry corresponding to the key, or null if the key is not present
 +     */
 +    protected RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast)
 +    {
 +        if (op == Operator.EQ)
 +        {
 +            assert key instanceof DecoratedKey; // EQ only make sense if the key is a valid row key
 +            if (!bf.isPresent((DecoratedKey)key))
 +            {
 +                Tracing.trace("Bloom filter allows skipping sstable {}", descriptor.generation);
 +                return null;
 +            }
 +        }
 +
 +        // next, the key cache (only make sense for valid row key)
 +        if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey))
 +        {
 +            DecoratedKey decoratedKey = (DecoratedKey)key;
-             KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, decoratedKey.getKey());
++            KeyCacheKey cacheKey = new KeyCacheKey(metadata.ksAndCFName, descriptor, decoratedKey.getKey());
 +            RowIndexEntry cachedPosition = getCachedPosition(cacheKey, updateCacheAndStats);
 +            if (cachedPosition != null)
 +            {
 +                Tracing.trace("Key cache hit for sstable {}", descriptor.generation);
 +                return cachedPosition;
 +            }
 +        }
 +
 +        // check the smallest and greatest keys in the sstable to see if it can't be present
 +        boolean skip = false;
 +        if (key.compareTo(first) < 0)
 +        {
 +            if (op == Operator.EQ)
 +                skip = true;
 +            else
 +                key = first;
 +
 +            op = Operator.EQ;
 +        }
 +        else
 +        {
 +            int l = last.compareTo(key);
 +            // l <= 0  => we may be looking past the end of the file; we then narrow our behaviour to:
 +            //             1) skipping if strictly greater for GE and EQ;
 +            //             2) skipping if equal and searching GT, and we aren't permitting matching past last
 +            skip = l <= 0 && (l < 0 || (!permitMatchPastLast && op == Operator.GT));
 +        }
 +        if (skip)
 +        {
 +            if (op == Operator.EQ && updateCacheAndStats)
 +                bloomFilterTracker.addFalsePositive();
 +            Tracing.trace("Check against min and max keys allows skipping sstable {}", descriptor.generation);
 +            return null;
 +        }
 +
 +        int binarySearchResult = indexSummary.binarySearch(key);
 +        long sampledPosition = getIndexScanPositionFromBinarySearchResult(binarySearchResult, indexSummary);
 +        int sampledIndex = getIndexSummaryIndexFromBinarySearchResult(binarySearchResult);
 +
 +        int effectiveInterval = indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex);
 +
 +        if (ifile == null)
 +            return null;
 +
 +        // scan the on-disk index, starting at the nearest sampled position.
 +        // The check against IndexInterval is to be exit the loop in the EQ case when the key looked for is not present
 +        // (bloom filter false positive). But note that for non-EQ cases, we might need to check the first key of the
 +        // next index position because the searched key can be greater the last key of the index interval checked if it
 +        // is lesser than the first key of next interval (and in that case we must return the position of the first key
 +        // of the next interval).
 +        int i = 0;
 +        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
 +        while (segments.hasNext())
 +        {
 +            String path = null;
 +            try (FileDataInput in = segments.next())
 +            {
 +                path = in.getPath();
 +                while (!in.isEOF())
 +                {
 +                    i++;
 +
 +                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
 +
 +                    boolean opSatisfied; // did we find an appropriate position for the op requested
 +                    boolean exactMatch; // is the current position an exact match for the key, suitable for caching
 +
 +                    // Compare raw keys if possible for performance, otherwise compare decorated keys.
 +                    if (op == Operator.EQ && i <= effectiveInterval)
 +                    {
 +                        opSatisfied = exactMatch = indexKey.equals(((DecoratedKey) key).getKey());
 +                    }
 +                    else
 +                    {
 +                        DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
 +                        int comparison = indexDecoratedKey.compareTo(key);
 +                        int v = op.apply(comparison);
 +                        opSatisfied = (v == 0);
 +                        exactMatch = (comparison == 0);
 +                        if (v < 0)
 +                        {
 +                            Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation);
 +                            return null;
 +                        }
 +                    }
 +
 +                    if (opSatisfied)
 +                    {
 +                        // read data position from index entry
 +                        RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in, descriptor.version);
 +                        if (exactMatch && updateCacheAndStats)
 +                        {
 +                            assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key
 +                            DecoratedKey decoratedKey = (DecoratedKey)key;
 +
 +                            if (logger.isTraceEnabled())
 +                            {
 +                                // expensive sanity check!  see CASSANDRA-4687
 +                                try (FileDataInput fdi = dfile.getSegment(indexEntry.position))
 +                                {
 +                                    DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi));
 +                                    if (!keyInDisk.equals(key))
 +                                        throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
 +                                }
 +                            }
 +
 +                            // store exact match for the key
 +                            cacheKey(decoratedKey, indexEntry);
 +                        }
 +                        if (op == Operator.EQ && updateCacheAndStats)
 +                            bloomFilterTracker.addTruePositive();
 +                        Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation);
 +                        return indexEntry;
 +                    }
 +
 +                    RowIndexEntry.Serializer.skip(in);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                markSuspect();
 +                throw new CorruptSSTableException(e, path);
 +            }
 +        }
 +
 +        if (op == SSTableReader.Operator.EQ && updateCacheAndStats)
 +            bloomFilterTracker.addFalsePositive();
 +        Tracing.trace("Partition index lookup complete (bloom filter false positive) for sstable {}", descriptor.generation);
 +        return null;
 +    }
 +
 +
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CacheService.java
index a775627,50d8903..a13a52d
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@@ -267,9 -282,9 +266,9 @@@ public class CacheService implements Ca
          keyCache.clear();
      }
  
-     public void invalidateKeyCacheForCf(UUID cfId)
+     public void invalidateKeyCacheForCf(Pair<String, String> ksAndCFName)
      {
 -        Iterator<KeyCacheKey> keyCacheIterator = keyCache.getKeySet().iterator();
 +        Iterator<KeyCacheKey> keyCacheIterator = keyCache.keyIterator();
          while (keyCacheIterator.hasNext())
          {
              KeyCacheKey key = keyCacheIterator.next();
@@@ -283,9 -298,9 +282,9 @@@
          rowCache.clear();
      }
  
-     public void invalidateRowCacheForCf(UUID cfId)
+     public void invalidateRowCacheForCf(Pair<String, String> ksAndCFName)
      {
 -        Iterator<RowCacheKey> rowCacheIterator = rowCache.getKeySet().iterator();
 +        Iterator<RowCacheKey> rowCacheIterator = rowCache.keyIterator();
          while (rowCacheIterator.hasNext())
          {
              RowCacheKey rowCacheKey = rowCacheIterator.next();
@@@ -294,9 -309,9 +293,9 @@@
          }
      }
  
-     public void invalidateCounterCacheForCf(UUID cfId)
+     public void invalidateCounterCacheForCf(Pair<String, String> ksAndCFName)
      {
 -        Iterator<CounterCacheKey> counterCacheIterator = counterCache.getKeySet().iterator();
 +        Iterator<CounterCacheKey> counterCacheIterator = counterCache.keyIterator();
          while (counterCacheIterator.hasNext())
          {
              CounterCacheKey counterCacheKey = counterCacheIterator.next();
@@@ -423,7 -487,7 +435,7 @@@
              ByteBufferUtil.writeWithLength(key.key, out);
              out.writeInt(key.desc.generation);
              out.writeBoolean(true);
-             key.desc.getFormat().getIndexSerializer(cfm).serialize(entry, out);
 -            cfs.metadata.comparator.rowIndexEntrySerializer().serialize(entry, out);
++            key.desc.getFormat().getIndexSerializer(cfs.metadata).serialize(entry, out);
          }
  
          public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException
@@@ -436,15 -502,15 +450,15 @@@
              }
              ByteBuffer key = ByteBufferUtil.read(input, keyLength);
              int generation = input.readInt();
-             SSTableReader reader = findDesc(generation, cfs.getSSTables());
              input.readBoolean(); // backwards compatibility for "promoted indexes" boolean
-             if (reader == null)
+             SSTableReader reader = null;
+             if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = findDesc(generation, cfs.getSSTables())) == null)
              {
-                 RowIndexEntry.Serializer.skipPromotedIndex(input);
+                 RowIndexEntry.Serializer.skip(input);
                  return null;
              }
 -            RowIndexEntry entry = reader.metadata.comparator.rowIndexEntrySerializer().deserialize(input, reader.descriptor.version);
 +            RowIndexEntry entry = reader.descriptor.getFormat().getIndexSerializer(reader.metadata).deserialize(input, reader.descriptor.version);
-             return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.cfId, reader.descriptor, key), entry));
+             return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry));
          }
  
          private SSTableReader findDesc(int generation, Collection<SSTableReader> collection)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index 2020201,17553f3..075c8f7
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -25,25 -25,22 +25,27 @@@ import java.net.InetAddress
  import java.net.UnknownHostException;
  import java.rmi.registry.LocateRegistry;
  import java.rmi.server.RMIServerSocketFactory;
 +import java.util.Collections;
+ import java.util.List;
 -import java.util.*;
 +import java.util.Map;
 +import java.util.UUID;
  import java.util.concurrent.TimeUnit;
+ 
  import javax.management.MBeanServer;
- import javax.management.MalformedObjectNameException;
  import javax.management.ObjectName;
  import javax.management.StandardMBean;
  import javax.management.remote.JMXConnectorServer;
  import javax.management.remote.JMXServiceURL;
  import javax.management.remote.rmi.RMIConnectorServer;
  
 +import com.codahale.metrics.Meter;
 +import com.codahale.metrics.MetricRegistryListener;
 +import com.codahale.metrics.SharedMetricRegistries;
  import com.google.common.annotations.VisibleForTesting;
--import com.google.common.collect.Iterables;
+ import com.google.common.util.concurrent.Futures;
+ import com.google.common.util.concurrent.ListenableFuture;
  import com.google.common.util.concurrent.Uninterruptibles;
 +import org.apache.cassandra.metrics.DefaultNameFactory;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -353,29 -465,6 +376,29 @@@ public class CassandraDaemo
          return setupCompleted;
      }
  
 +    private void logSystemInfo()
 +    {
 +    	if (logger.isInfoEnabled())
 +    	{
 +	        try
 +	        {
 +	            logger.info("Hostname: {}", InetAddress.getLocalHost().getHostName());
 +	        }
 +	        catch (UnknownHostException e1)
 +	        {
 +	            logger.info("Could not resolve local host");
 +	        }
- 	
++
 +	        logger.info("JVM vendor/version: {}/{}", System.getProperty("java.vm.name"), System.getProperty("java.version"));
 +	        logger.info("Heap size: {}/{}", Runtime.getRuntime().totalMemory(), Runtime.getRuntime().maxMemory());
- 	
++
 +	        for(MemoryPoolMXBean pool: ManagementFactory.getMemoryPoolMXBeans())
 +	            logger.info("{} {}: {}", pool.getName(), pool.getType(), pool.getPeakUsage());
- 	
++
 +	        logger.info("Classpath: {}", System.getProperty("java.class.path"));
 +    	}
 +    }
 +
      /**
       * Initialize the Cassandra Daemon based on the given <a
       * href="http://commons.apache.org/daemon/jsvc.html">Commons

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 38736dc,431f163..fa370dc
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -683,15 -717,18 +683,22 @@@ public class StorageService extends Not
          }, "StorageServiceShutdownHook");
          Runtime.getRuntime().addShutdownHook(drainOnShutdown);
  
 +        replacing = DatabaseDescriptor.isReplacing();
 +
          prepareToJoin();
  
 +        // Has to be called after the host id has potentially changed in prepareToJoin().
-         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-             if (cfs.metadata.isCounter())
-                 cfs.initCounterCache();
+         try
+         {
+             CacheService.instance.counterCache.loadSavedAsync().get();
+         }
+         catch (Throwable t)
+         {
+             JVMStabilityInspector.inspectThrowable(t);
+             logger.warn("Error loading counter cache", t);
+         }
+ 
 +
          if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))
          {
              joinTokenRing(delay);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index fe26616,63f89a4..bfcfa59
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@@ -24,17 -24,14 +24,17 @@@ package org.apache.cassandra.cache
  import java.nio.ByteBuffer;
  import java.util.ArrayList;
  import java.util.List;
--import java.util.UUID;
  
 +import org.junit.BeforeClass;
  import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.db.ArrayBackedSortedColumns;
  import org.apache.cassandra.db.ColumnFamily;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.locator.SimpleStrategy;
+ import org.apache.cassandra.utils.Pair;
  
  import com.googlecode.concurrentlinkedhashmap.Weighers;
  
@@@ -132,17 -119,16 +132,16 @@@ public class CacheProviderTes
      @Test
      public void testKeys()
      {
-         UUID cfId = UUID.randomUUID();
- 
 -        Pair<String, String> ksAndCFName = Pair.create(keyspaceName, cfName);
++        Pair<String, String> ksAndCFName = Pair.create(KEYSPACE1, CF_STANDARD1);
          byte[] b1 = {1, 2, 3, 4};
-         RowCacheKey key1 = new RowCacheKey(cfId, ByteBuffer.wrap(b1));
+         RowCacheKey key1 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b1));
          byte[] b2 = {1, 2, 3, 4};
-         RowCacheKey key2 = new RowCacheKey(cfId, ByteBuffer.wrap(b2));
+         RowCacheKey key2 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b2));
          assertEquals(key1, key2);
          assertEquals(key1.hashCode(), key2.hashCode());
-         
+ 
          byte[] b3 = {1, 2, 3, 5};
-         RowCacheKey key3 = new RowCacheKey(cfId, ByteBuffer.wrap(b3));
+         RowCacheKey key3 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b3));
          assertNotSame(key1, key3);
          assertNotSame(key1.hashCode(), key3.hashCode());
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
index 0000000,0e879e9..1a60d6d
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
+++ b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
@@@ -1,0 -1,263 +1,266 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.cql3;
+ 
+ import java.util.ArrayList;
++import java.util.Iterator;
+ import java.util.List;
+ 
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import com.google.common.collect.ImmutableSet;
 -import com.yammer.metrics.Metrics;
 -import com.yammer.metrics.core.MetricName;
+ 
+ import org.apache.cassandra.cache.KeyCacheKey;
+ import org.apache.cassandra.config.Schema;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.metrics.CacheMetrics;
++import org.apache.cassandra.metrics.CassandraMetricsRegistry;
+ import org.apache.cassandra.service.CacheService;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.Pair;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertNull;
+ 
+ public class KeyCacheCqlTest extends CQLTester
+ {
+ 
+     static final String commonColumnsDef =
+     "part_key_a     int," +
+     "part_key_b     text," +
+     "clust_key_a    int," +
+     "clust_key_b    text," +
+     "clust_key_c    frozen<list<text>>," + // to make it really big
+     "col_text       text," +
+     "col_int        int," +
+     "col_long       bigint,";
+     static final String commonColumns =
+     "part_key_a," +
+     "part_key_b," +
+     "clust_key_a," +
+     "clust_key_b," +
+     "clust_key_c," + // to make it really big
+     "col_text," +
+     "col_int," +
+     "col_long";
+ 
+     @Test
+     public void test2iKeyCachePaths() throws Throwable
+     {
+         String table = createTable("CREATE TABLE %s ("
+                                    + commonColumnsDef
+                                    + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))");
+         createIndex("CREATE INDEX some_index ON %s (col_int)");
+         insertData(table, "some_index", true);
+         clearCache();
+ 
+         CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+             assertEquals(500, result.size());
+         }
+ 
 -        long hits = metrics.hits.count();
 -        long requests = metrics.requests.count();
++        long hits = metrics.hits.getCount();
++        long requests = metrics.requests.getCount();
+         assertEquals(4900, hits);
+         assertEquals(5250, requests);
+ 
+         //
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+             // 100 part-keys * 50 clust-keys
+             // indexed on part-key % 10 = 10 index partitions
+             // (50 clust-keys  *  100-part-keys  /  10 possible index-values) = 500
+             assertEquals(500, result.size());
+         }
+ 
+         metrics = CacheService.instance.keyCache.getMetrics();
 -        hits = metrics.hits.count();
 -        requests = metrics.requests.count();
++        hits = metrics.hits.getCount();
++        requests = metrics.requests.getCount();
+         assertEquals(10000, hits);
+         assertEquals(10500, requests);
+ 
+         CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
+ 
+         int beforeSize = CacheService.instance.keyCache.size();
+ 
+         CacheService.instance.keyCache.clear();
+ 
+         Assert.assertEquals(0, CacheService.instance.keyCache.size());
+ 
+         // then load saved
+         CacheService.instance.keyCache.loadSaved();
+ 
+         assertEquals(beforeSize, CacheService.instance.keyCache.size());
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+             // 100 part-keys * 50 clust-keys
+             // indexed on part-key % 10 = 10 index partitions
+             // (50 clust-keys  *  100-part-keys  /  10 possible index-values) = 500
+             assertEquals(500, result.size());
+         }
+ 
+         //Test Schema.getColumnFamilyStoreIncludingIndexes, several null check paths
+         //are defensive and unreachable
+         assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create("foo", "bar")));
+         assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(KEYSPACE, "bar")));
+ 
+         dropTable("DROP TABLE %s");
+ 
+         //Test loading for a dropped 2i/table
+         CacheService.instance.keyCache.clear();
+ 
+         // then load saved
+         CacheService.instance.keyCache.loadSaved();
+ 
+         assertEquals(0, CacheService.instance.keyCache.size());
+     }
+ 
+     @Test
+     public void test2iKeyCachePathsSaveKeysForDroppedTable() throws Throwable
+     {
+         String table = createTable("CREATE TABLE %s ("
+                                    + commonColumnsDef
+                                    + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))");
+         createIndex("CREATE INDEX some_index ON %s (col_int)");
+         insertData(table, "some_index", true);
+         clearCache();
+ 
+         CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+             assertEquals(500, result.size());
+         }
+ 
 -        long hits = metrics.hits.count();
 -        long requests = metrics.requests.count();
++        long hits = metrics.hits.getCount();
++        long requests = metrics.requests.getCount();
+         assertEquals(4900, hits);
+         assertEquals(5250, requests);
+ 
+         //
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+             // 100 part-keys * 50 clust-keys
+             // indexed on part-key % 10 = 10 index partitions
+             // (50 clust-keys  *  100-part-keys  /  10 possible index-values) = 500
+             assertEquals(500, result.size());
+         }
+ 
+         metrics = CacheService.instance.keyCache.getMetrics();
 -        hits = metrics.hits.count();
 -        requests = metrics.requests.count();
++        hits = metrics.hits.getCount();
++        requests = metrics.requests.getCount();
+         assertEquals(10000, hits);
+         assertEquals(10500, requests);
+ 
+         dropTable("DROP TABLE %s");
+ 
+         CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
+ 
+         CacheService.instance.keyCache.clear();
+ 
+         Assert.assertEquals(0, CacheService.instance.keyCache.size());
+ 
+         // then load saved
+         CacheService.instance.keyCache.loadSaved();
+ 
 -        for (KeyCacheKey key : CacheService.instance.keyCache.getKeySet())
++        Iterator<KeyCacheKey> iter = CacheService.instance.keyCache.keyIterator();
++        while(iter.hasNext())
+         {
++            KeyCacheKey key = iter.next();
+             Assert.assertFalse(key.ksAndCFName.left.equals("KEYSPACE"));
+             Assert.assertFalse(key.ksAndCFName.right.startsWith(table));
+         }
+     }
+ 
+     // Inserts 100 partitions split over 10 sstables (flush after 10 partitions).
+     // Clustered tables receive 50 CQL rows per partition.
+     private void insertData(String table, String index, boolean withClustering) throws Throwable
+     {
+         StorageService.instance.disableAutoCompaction(KEYSPACE, table);
+         Keyspace.open(KEYSPACE).getColumnFamilyStore(table).forceFlush().get();
+         Keyspace.open(KEYSPACE).getColumnFamilyStore(table).truncateBlocking();
+         if (index != null)
+         {
+             StorageService.instance.disableAutoCompaction(KEYSPACE, table + '.' + index);
+             Keyspace.open(KEYSPACE).getColumnFamilyStore(table).indexManager.getIndexesByNames(ImmutableSet.of(table + "." + index)).iterator().next().forceBlockingFlush();
+         }
+ 
+         for (int i = 0; i < 100; i++)
+         {
+             int partKeyA = i;
+             String partKeyB = Integer.toOctalString(i);
+             for (int c = 0; c < (withClustering ? 50 : 1); c++)
+             {
+                 int clustKeyA = c;
+                 String clustKeyB = Integer.toOctalString(c);
+                 List<String> clustKeyC = makeList(clustKeyB);
+                 String colText = String.valueOf(i) + '-' + String.valueOf(c);
+                 int colInt = i % 10;
+                 long colLong = c;
+                 execute("INSERT INTO %s (" + commonColumns + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
+                         partKeyA, partKeyB,
+                         clustKeyA, clustKeyB, clustKeyC,
+                         colText, colInt, colLong);
+             }
+ 
+             if (i % 10 == 9)
+             {
+                 Keyspace.open(KEYSPACE).getColumnFamilyStore(table).forceFlush().get();
+                 if (index != null)
+                     Keyspace.open(KEYSPACE).getColumnFamilyStore(table).indexManager.getIndexesByNames(ImmutableSet.of(table + "." + index)).iterator().next().forceBlockingFlush();
+             }
+         }
+     }
+ 
+     private static List<String> makeList(String value)
+     {
+         List<String> list = new ArrayList<>(50);
+         for (int i = 0; i < 50; i++)
+         {
+             list.add(value + i);
+         }
+         return list;
+     }
+ 
+     private static void clearCache()
+     {
 -        for (MetricName name : ImmutableSet.copyOf(Metrics.defaultRegistry().allMetrics().keySet()))
++        for (String name : ImmutableSet.copyOf(CassandraMetricsRegistry.Metrics.getMetrics().keySet()))
+         {
 -            Metrics.defaultRegistry().removeMetric(name);
++            CassandraMetricsRegistry.Metrics.remove(name);
+         }
++
+         CacheService.instance.keyCache.clear();
+         CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
 -        Assert.assertEquals(0, metrics.entries.value().intValue());
 -        Assert.assertEquals(0L, metrics.hits.count());
 -        Assert.assertEquals(0L, metrics.requests.count());
 -        Assert.assertEquals(0L, metrics.size.value().longValue());
++        Assert.assertEquals(0, metrics.entries.getValue().intValue());
++        Assert.assertEquals(0L, metrics.hits.getCount());
++        Assert.assertEquals(0L, metrics.requests.getCount());
++        Assert.assertEquals(0L, metrics.size.getValue().longValue());
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/db/CounterCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/CounterCacheTest.java
index 71f8b20,20e067c..5b37b2c
--- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
@@@ -24,11 -23,8 +24,12 @@@ import org.junit.BeforeClass
  import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.config.KSMetaData;
 +import org.apache.cassandra.db.marshal.CounterColumnType;
 +import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.exceptions.WriteTimeoutException;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.service.CacheService;
  import org.apache.cassandra.utils.FBUtilities;
  
@@@ -62,7 -48,8 +63,8 @@@ public class CounterCacheTes
      @Test
      public void testReadWrite()
      {
 -        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
          CacheService.instance.invalidateCounterCache();
  
          assertEquals(0, CacheService.instance.counterCache.size());
@@@ -86,7 -73,8 +88,8 @@@
      @Test
      public void testSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
      {
 -        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
          CacheService.instance.invalidateCounterCache();
  
          ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
@@@ -108,4 -96,69 +111,69 @@@
          assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(2), cellname(1)));
          assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(2), cellname(2)));
      }
+ 
+     @Test
+     public void testDroppedSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
+     {
 -        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
++        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+         CacheService.instance.invalidateCounterCache();
+ 
+         ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+         cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
+         cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
 -        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
 -        new CounterMutation(new Mutation(KS, bytes(2), cells), ConsistencyLevel.ONE).apply();
++        new CounterMutation(new Mutation(KEYSPACE1, bytes(1), cells), ConsistencyLevel.ONE).apply();
++        new CounterMutation(new Mutation(KEYSPACE1, bytes(2), cells), ConsistencyLevel.ONE).apply();
+ 
+         // flush the counter cache and invalidate
+         CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
+         CacheService.instance.invalidateCounterCache();
+         assertEquals(0, CacheService.instance.counterCache.size());
+ 
 -        Keyspace ks = Schema.instance.removeKeyspaceInstance(KS);
++        Keyspace ks = Schema.instance.removeKeyspaceInstance(KEYSPACE1);
+ 
+         try
+         {
+             // load from cache and validate
+             CacheService.instance.counterCache.loadSaved();
+             assertEquals(0, CacheService.instance.counterCache.size());
+         }
+         finally
+         {
+             Schema.instance.storeKeyspaceInstance(ks);
+         }
+     }
+ 
+     @Test
+     public void testDisabledSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
+     {
 -        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
++        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+         CacheService.instance.invalidateCounterCache();
+ 
+         ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+         cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
+         cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
 -        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
 -        new CounterMutation(new Mutation(KS, bytes(2), cells), ConsistencyLevel.ONE).apply();
++        new CounterMutation(new Mutation(KEYSPACE1, bytes(1), cells), ConsistencyLevel.ONE).apply();
++        new CounterMutation(new Mutation(KEYSPACE1, bytes(2), cells), ConsistencyLevel.ONE).apply();
+ 
+         // flush the counter cache and invalidate
+         CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
+         CacheService.instance.invalidateCounterCache();
+         assertEquals(0, CacheService.instance.counterCache.size());
+ 
+ 
+         CacheService.instance.setCounterCacheCapacityInMB(0);
+         try
+         {
+             // load from cache and validate
+             CacheService.instance.counterCache.loadSaved();
+             assertEquals(0, CacheService.instance.counterCache.size());
+         }
+         finally
+         {
+             CacheService.instance.setCounterCacheCapacityInMB(1);
+         }
+     }
+ 
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RowCacheTest.java
index a4b7514,6d4554d..5912d7c
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@@ -28,9 -27,8 +28,10 @@@ import org.junit.Test
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 +import org.apache.cassandra.cache.CachingOptions;
  import org.apache.cassandra.cache.RowCacheKey;
 +import org.apache.cassandra.config.KSMetaData;
+ import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.composites.*;
  import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.db.filter.QueryFilter;
@@@ -177,6 -158,42 +178,42 @@@ public class RowCacheTes
          rowCacheLoad(100, 50, 0);
          CacheService.instance.setRowCacheCapacityInMB(0);
      }
+ 
+     @Test
+     public void testRowCacheDropSaveLoad() throws Exception
+     {
+         CacheService.instance.setRowCacheCapacityInMB(1);
+         rowCacheLoad(100, 50, 0);
+         CacheService.instance.rowCache.submitWrite(Integer.MAX_VALUE).get();
 -        Keyspace instance = Schema.instance.removeKeyspaceInstance(KEYSPACE);
++        Keyspace instance = Schema.instance.removeKeyspaceInstance(KEYSPACE_CACHED);
+         try
+         {
+             CacheService.instance.rowCache.size();
+             CacheService.instance.rowCache.clear();
+             CacheService.instance.rowCache.loadSaved();
+             int after = CacheService.instance.rowCache.size();
+             assertEquals(0, after);
+         }
+         finally
+         {
+             Schema.instance.storeKeyspaceInstance(instance);
+         }
+     }
+ 
+     @Test
+     public void testRowCacheDisabled() throws Exception
+     {
+         CacheService.instance.setRowCacheCapacityInMB(1);
+         rowCacheLoad(100, 50, 0);
+         CacheService.instance.rowCache.submitWrite(Integer.MAX_VALUE).get();
+         CacheService.instance.setRowCacheCapacityInMB(0);
+         CacheService.instance.rowCache.size();
+         CacheService.instance.rowCache.clear();
+         CacheService.instance.rowCache.loadSaved();
+         int after = CacheService.instance.rowCache.size();
+         assertEquals(0, after);
+     }
+ 
      @Test
      public void testRowCacheRange()
      {
@@@ -195,8 -212,8 +232,8 @@@
  
          ByteBuffer key = ByteBufferUtil.bytes("rowcachekey");
          DecoratedKey dk = cachedStore.partitioner.decorateKey(key);
-         RowCacheKey rck = new RowCacheKey(cachedStore.metadata.cfId, dk);
+         RowCacheKey rck = new RowCacheKey(cachedStore.metadata.ksAndCFName, dk);
 -        Mutation mutation = new Mutation(KEYSPACE, key);
 +        Mutation mutation = new Mutation(KEYSPACE_CACHED, key);
          for (int i = 0; i < 200; i++)
              mutation.add(cf, Util.cellname(i), ByteBufferUtil.bytes("val" + i), System.currentTimeMillis());
          mutation.applyUnsafe();
@@@ -271,7 -288,7 +308,7 @@@
  
          // empty the cache again to make sure values came from disk
          CacheService.instance.invalidateRowCache();
 -        assert CacheService.instance.rowCache.size() == 0;
 -        assert CacheService.instance.rowCache.loadSaved() == (keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave);
 +        assertEquals(0, CacheService.instance.rowCache.size());
-         assertEquals(keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave, CacheService.instance.rowCache.loadSaved(store));
++        assertEquals(keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave, CacheService.instance.rowCache.loadSaved());
      }
  }


[4/7] cassandra git commit: 2i key cache load fails

Posted by sn...@apache.org.
2i key cache load fails

patch by Ariel Weisberg; reviewed by Robert Stupp for CASSANDRA-10155


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e889ee40
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e889ee40
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e889ee40

Branch: refs/heads/cassandra-2.2
Commit: e889ee408bec5330c312ff6b72a81a0012fdf2a5
Parents: 6479d94
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Wed Sep 16 21:57:54 2015 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Sep 16 21:57:54 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cache/AutoSavingCache.java | 172 +++++++-----
 .../org/apache/cassandra/cache/CacheKey.java    |  14 +-
 .../apache/cassandra/cache/CounterCacheKey.java |  26 +-
 .../org/apache/cassandra/cache/KeyCacheKey.java |  19 +-
 .../org/apache/cassandra/cache/RowCacheKey.java |  30 +--
 .../org/apache/cassandra/config/CFMetaData.java |   9 +
 .../cassandra/config/DatabaseDescriptor.java    |  15 +-
 .../org/apache/cassandra/config/Schema.java     |  56 +++-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  75 ++----
 src/java/org/apache/cassandra/db/Keyspace.java  |   4 -
 .../org/apache/cassandra/db/RowIndexEntry.java  |   2 +-
 .../db/index/SecondaryIndexManager.java         |  21 +-
 .../cassandra/io/sstable/SSTableReader.java     |   8 +-
 .../apache/cassandra/service/CacheService.java  |  58 ++--
 .../cassandra/service/CassandraDaemon.java      |  45 +++-
 .../cassandra/service/StorageService.java       |  31 ++-
 .../org/apache/cassandra/utils/FBUtilities.java |  16 ++
 .../cassandra/cache/AutoSavingCacheTest.java    |   5 +-
 .../cassandra/cache/CacheProviderTest.java      |  16 +-
 .../apache/cassandra/cql3/KeyCacheCqlTest.java  | 263 +++++++++++++++++++
 .../apache/cassandra/db/CounterCacheTest.java   |  70 ++++-
 .../org/apache/cassandra/db/KeyCacheTest.java   |   2 +-
 .../org/apache/cassandra/db/RowCacheTest.java   |  42 ++-
 24 files changed, 739 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2787739..207f16a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.10
+ * Fix cache handling of 2i and base tables (CASSANDRA-10155)
  * Fix NPE in nodetool compactionhistory (CASSANDRA-9758)
  * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410)
  * BATCH statement is broken in cqlsh (CASSANDRA-10272)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 98e3e59..3ebbc76 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.cache;
 
 import java.io.*;
 import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -27,6 +29,10 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -60,7 +66,15 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
     protected final CacheService.CacheType cacheType;
 
     private CacheSerializer<K, V> cacheLoader;
-    private static final String CURRENT_VERSION = "b";
+
+    /*
+     * CASSANDRA-10155 required a format change to fix 2i indexes and caching.
+     * 2.2 is already at version "c" and 3.0 is at "d".
+     *
+     * Since cache versions match exactly and there is no partial fallback just add
+     * a minor version letter.
+     */
+    private static final String CURRENT_VERSION = "ba";
 
     private static volatile IStreamFactory streamFactory = new IStreamFactory()
     {
@@ -88,16 +102,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
         this.cacheLoader = cacheloader;
     }
 
-    @Deprecated
-    public File getCachePath(String ksName, String cfName, UUID cfId, String version)
-    {
-        return DatabaseDescriptor.getSerializedCachePath(ksName, cfName, cfId, cacheType, version);
-    }
-
-    public File getCachePath(UUID cfId, String version)
+    public File getCachePath(String version)
     {
-        Pair<String, String> names = Schema.instance.getCF(cfId);
-        return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version);
+        return DatabaseDescriptor.getSerializedCachePath(cacheType, version);
     }
 
     public Writer getWriter(int keysToSave)
@@ -128,16 +135,42 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
         }
     }
 
-    public int loadSaved(ColumnFamilyStore cfs)
+    public ListenableFuture<Integer> loadSavedAsync()
+    {
+        final ListeningExecutorService es = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+        final long start = System.nanoTime();
+
+        ListenableFuture<Integer> cacheLoad = es.submit(new Callable<Integer>()
+        {
+            @Override
+            public Integer call() throws Exception
+            {
+                return loadSaved();
+            }
+        });
+        cacheLoad.addListener(new Runnable() {
+            @Override
+            public void run()
+            {
+                if (size() > 0)
+                    logger.info("Completed loading ({} ms; {} keys) {} cache",
+                            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
+                            CacheService.instance.keyCache.size(),
+                            cacheType);
+                es.shutdown();
+            }
+        }, MoreExecutors.sameThreadExecutor());
+
+        return cacheLoad;
+    }
+
+    public int loadSaved()
     {
         int count = 0;
         long start = System.nanoTime();
 
         // modern format, allows both key and value (so key cache load can be purely sequential)
-        File path = getCachePath(cfs.metadata.cfId, CURRENT_VERSION);
-        // if path does not exist, try without cfId (assuming saved cache is created with current CF)
-        if (!path.exists())
-            path = getCachePath(cfs.keyspace.getName(), cfs.name, null, CURRENT_VERSION);
+        File path = getCachePath(CURRENT_VERSION);
         if (path.exists())
         {
             DataInputStream in = null;
@@ -145,28 +178,57 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             {
                 logger.info(String.format("reading saved cache %s", path));
                 in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length()));
-                List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>();
+                ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<Future<Pair<K, V>>>();
+
                 while (in.available() > 0)
                 {
-                    Future<Pair<K, V>> entry = cacheLoader.deserialize(in, cfs);
+                    //ksname and cfname are serialized by the serializers in CacheService
+                    //That is delegated there because there are serializer specific conditions
+                    //where a cache key is skipped and not written
+                    String ksname = in.readUTF();
+                    String cfname = in.readUTF();
+
+                    ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(ksname, cfname));
+
+                    Future<Pair<K, V>> entryFuture = cacheLoader.deserialize(in, cfs);
                     // Key cache entry can return null, if the SSTable doesn't exist.
-                    if (entry == null)
+                    if (entryFuture == null)
                         continue;
-                    futures.add(entry);
+
+                    futures.offer(entryFuture);
                     count++;
+
+                    /*
+                     * Kind of unwise to accrue an unbounded number of pending futures
+                     * So now there is this loop to keep a bounded number pending.
+                     */
+                    do
+                    {
+                        while (futures.peek() != null && futures.peek().isDone())
+                        {
+                            Future<Pair<K, V>> future = futures.poll();
+                            Pair<K, V> entry = future.get();
+                            if (entry != null && entry.right != null)
+                                put(entry.left, entry.right);
+                        }
+
+                        if (futures.size() > 1000)
+                            Thread.yield();
+                    } while(futures.size() > 1000);
                 }
 
-                for (Future<Pair<K, V>> future : futures)
+                Future<Pair<K, V>> future = null;
+                while ((future = futures.poll()) != null)
                 {
                     Pair<K, V> entry = future.get();
                     if (entry != null && entry.right != null)
                         put(entry.left, entry.right);
                 }
             }
-            catch (Exception e)
+            catch (Throwable t)
             {
-                JVMStabilityInspector.inspectThrowable(e);
-                logger.debug(String.format("harmless error reading saved cache %s", path.getAbsolutePath()), e);
+                JVMStabilityInspector.inspectThrowable(t);
+                logger.info(String.format("Harmless error reading saved cache %s", path.getAbsolutePath()), t);
             }
             finally
             {
@@ -238,44 +300,33 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 
             long start = System.nanoTime();
 
-            HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
-            HashMap<UUID, OutputStream> streams = new HashMap<>();
-            HashMap<UUID, File> paths = new HashMap<>();
-
+            DataOutputStreamPlus writer = null;
+            File tempCacheFile = tempCacheFile();
             try
             {
+                try
+                {
+                    writer = new DataOutputStreamPlus(streamFactory.getOutputStream(tempCacheFile));
+                }
+                catch (FileNotFoundException e)
+                {
+                    throw new RuntimeException(e);
+                }
+
                 for (K key : keys)
                 {
-                    UUID cfId = key.getCFId();
-                    if (!Schema.instance.hasCF(key.getCFId()))
-                        continue; // the table has been dropped.
 
-                    DataOutputPlus writer = writers.get(cfId);
-                    if (writer == null)
-                    {
-                        File writerPath = tempCacheFile(cfId);
-                        OutputStream stream;
-                        try
-                        {
-                            stream = streamFactory.getOutputStream(writerPath);
-                            writer = new DataOutputStreamPlus(stream);
-                        }
-                        catch (FileNotFoundException e)
-                        {
-                            throw new RuntimeException(e);
-                        }
-                        paths.put(cfId, writerPath);
-                        streams.put(cfId, stream);
-                        writers.put(cfId, writer);
-                    }
+                    ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(key.ksAndCFName);
+                    if (cfs == null)
+                        continue; // the table or 2i has been dropped.
 
                     try
                     {
-                        cacheLoader.serialize(key, writer);
+                        cacheLoader.serialize(key, writer, cfs);
                     }
                     catch (IOException e)
                     {
-                        throw new FSWriteError(e, paths.get(cfId));
+                        throw new FSWriteError(e, tempCacheFile);
                     }
 
                     keysWritten++;
@@ -283,28 +334,23 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             }
             finally
             {
-                for (OutputStream writer : streams.values())
+                if (writer != null)
                     FileUtils.closeQuietly(writer);
             }
 
-            for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
-            {
-                UUID cfId = entry.getKey();
+            File cacheFile = getCachePath(CURRENT_VERSION);
 
-                File tmpFile = paths.get(cfId);
-                File cacheFile = getCachePath(cfId, CURRENT_VERSION);
+            cacheFile.delete(); // ignore error if it didn't exist
 
-                cacheFile.delete(); // ignore error if it didn't exist
-                if (!tmpFile.renameTo(cacheFile))
-                    logger.error("Unable to rename {} to {}", tmpFile, cacheFile);
-            }
+            if (!tempCacheFile.renameTo(cacheFile))
+                logger.error("Unable to rename {} to {}", tempCacheFile, cacheFile);
 
             logger.info("Saved {} ({} items) in {} ms", cacheType, keys.size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
         }
 
-        private File tempCacheFile(UUID cfId)
+        private File tempCacheFile()
         {
-            File path = getCachePath(cfId, CURRENT_VERSION);
+            File path = getCachePath(CURRENT_VERSION);
             return FileUtils.createTempFile(path.getName(), null, path.getParentFile());
         }
 
@@ -337,7 +383,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 
     public interface CacheSerializer<K extends CacheKey, V>
     {
-        void serialize(K key, DataOutputPlus out) throws IOException;
+        void serialize(K key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException;
 
         Future<Pair<K, V>> deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/cache/CacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/CacheKey.java b/src/java/org/apache/cassandra/cache/CacheKey.java
index 44fead0..0e82990 100644
--- a/src/java/org/apache/cassandra/cache/CacheKey.java
+++ b/src/java/org/apache/cassandra/cache/CacheKey.java
@@ -17,12 +17,14 @@
  */
 package org.apache.cassandra.cache;
 
-import java.util.UUID;
+import org.apache.cassandra.utils.Pair;
 
-public interface CacheKey extends IMeasurableMemory
+public abstract class CacheKey implements IMeasurableMemory
 {
-    /**
-     * @return The cf id of the cache key.
-     */
-    public UUID getCFId();
+    public final Pair<String, String> ksAndCFName;
+
+    public CacheKey(Pair<String, String> ksAndCFName)
+    {
+        this.ksAndCFName = ksAndCFName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/cache/CounterCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/CounterCacheKey.java b/src/java/org/apache/cassandra/cache/CounterCacheKey.java
index 60247c5..68856eb 100644
--- a/src/java/org/apache/cassandra/cache/CounterCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/CounterCacheKey.java
@@ -19,36 +19,28 @@ package org.apache.cassandra.cache;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.UUID;
 
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.utils.*;
 
-public class CounterCacheKey implements CacheKey
+public final class CounterCacheKey extends CacheKey
 {
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new CounterCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER, CellNames.simpleDense(ByteBuffer.allocate(1))))
-                                           + ObjectSizes.measure(new UUID(0, 0));
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new CounterCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER, CellNames.simpleDense(ByteBuffer.allocate(1))));
 
-    public final UUID cfId;
     public final byte[] partitionKey;
     public final byte[] cellName;
 
-    private CounterCacheKey(UUID cfId, ByteBuffer partitionKey, CellName cellName)
+    private CounterCacheKey(Pair<String, String> ksAndCFName, ByteBuffer partitionKey, CellName cellName)
     {
-        this.cfId = cfId;
+        super(ksAndCFName);
         this.partitionKey = ByteBufferUtil.getArray(partitionKey);
         this.cellName = ByteBufferUtil.getArray(cellName.toByteBuffer());
     }
 
-    public static CounterCacheKey create(UUID cfId, ByteBuffer partitionKey, CellName cellName)
+    public static CounterCacheKey create(Pair<String, String> ksAndCFName, ByteBuffer partitionKey, CellName cellName)
     {
-        return new CounterCacheKey(cfId, partitionKey, cellName);
-    }
-
-    public UUID getCFId()
-    {
-        return cfId;
+        return new CounterCacheKey(ksAndCFName, partitionKey, cellName);
     }
 
     public long unsharedHeapSize()
@@ -62,7 +54,7 @@ public class CounterCacheKey implements CacheKey
     public String toString()
     {
         return String.format("CounterCacheKey(%s, %s, %s)",
-                             cfId,
+                             ksAndCFName,
                              ByteBufferUtil.bytesToHex(ByteBuffer.wrap(partitionKey)),
                              ByteBufferUtil.bytesToHex(ByteBuffer.wrap(cellName)));
     }
@@ -70,7 +62,7 @@ public class CounterCacheKey implements CacheKey
     @Override
     public int hashCode()
     {
-        return Arrays.deepHashCode(new Object[]{cfId, partitionKey, cellName});
+        return Arrays.deepHashCode(new Object[]{ksAndCFName, partitionKey, cellName});
     }
 
     @Override
@@ -84,7 +76,7 @@ public class CounterCacheKey implements CacheKey
 
         CounterCacheKey cck = (CounterCacheKey) o;
 
-        return cfId.equals(cck.cfId)
+        return ksAndCFName.equals(cck.ksAndCFName)
             && Arrays.equals(partitionKey, cck.partitionKey)
             && Arrays.equals(cellName, cck.cellName);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/cache/KeyCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/KeyCacheKey.java b/src/java/org/apache/cassandra/cache/KeyCacheKey.java
index cef37ce..222622c 100644
--- a/src/java/org/apache/cassandra/cache/KeyCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/KeyCacheKey.java
@@ -19,15 +19,14 @@ package org.apache.cassandra.cache;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.UUID;
 
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.Pair;
 
-public class KeyCacheKey implements CacheKey
+public class KeyCacheKey extends CacheKey
 {
-    public final UUID cfId;
     public final Descriptor desc;
 
     private static final long EMPTY_SIZE = ObjectSizes.measure(new KeyCacheKey(null, null, ByteBufferUtil.EMPTY_BYTE_BUFFER));
@@ -36,19 +35,15 @@ public class KeyCacheKey implements CacheKey
     // without extra copies on lookup since client-provided key ByteBuffers will be array-backed already
     public final byte[] key;
 
-    public KeyCacheKey(UUID cfId, Descriptor desc, ByteBuffer key)
+    public KeyCacheKey(Pair<String, String> ksAndCFName, Descriptor desc, ByteBuffer key)
     {
-        this.cfId = cfId;
+
+        super(ksAndCFName);
         this.desc = desc;
         this.key = ByteBufferUtil.getArray(key);
         assert this.key != null;
     }
 
-    public UUID getCFId()
-    {
-        return cfId;
-    }
-
     public String toString()
     {
         return String.format("KeyCacheKey(%s, %s)", desc, ByteBufferUtil.bytesToHex(ByteBuffer.wrap(key)));
@@ -67,13 +62,13 @@ public class KeyCacheKey implements CacheKey
 
         KeyCacheKey that = (KeyCacheKey) o;
 
-        return cfId.equals(that.cfId) && desc.equals(that.desc) && Arrays.equals(key, that.key);
+        return ksAndCFName.equals(that.ksAndCFName) && desc.equals(that.desc) && Arrays.equals(key, that.key);
     }
 
     @Override
     public int hashCode()
     {
-        int result = cfId.hashCode();
+        int result = ksAndCFName.hashCode();
         result = 31 * result + desc.hashCode();
         result = 31 * result + Arrays.hashCode(key);
         return result;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/cache/RowCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/RowCacheKey.java b/src/java/org/apache/cassandra/cache/RowCacheKey.java
index af2d4d4..c959fd1 100644
--- a/src/java/org/apache/cassandra/cache/RowCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/RowCacheKey.java
@@ -19,37 +19,30 @@ package org.apache.cassandra.cache;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.UUID;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.Pair;
 
-public class RowCacheKey implements CacheKey, Comparable<RowCacheKey>
+public final class RowCacheKey extends CacheKey
 {
-    public final UUID cfId;
     public final byte[] key;
 
     private static final long EMPTY_SIZE = ObjectSizes.measure(new RowCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER));
 
-    public RowCacheKey(UUID cfId, DecoratedKey key)
+    public RowCacheKey(Pair<String, String> ksAndCFName, DecoratedKey key)
     {
-        this(cfId, key.getKey());
+        this(ksAndCFName, key.getKey());
     }
 
-    public RowCacheKey(UUID cfId, ByteBuffer key)
+    public RowCacheKey(Pair<String, String> ksAndCFName, ByteBuffer key)
     {
-        this.cfId = cfId;
+        super(ksAndCFName);
         this.key = ByteBufferUtil.getArray(key);
         assert this.key != null;
     }
 
-    public UUID getCFId()
-    {
-        return cfId;
-    }
-
     public long unsharedHeapSize()
     {
         return EMPTY_SIZE + ObjectSizes.sizeOfArray(key);
@@ -63,25 +56,20 @@ public class RowCacheKey implements CacheKey, Comparable<RowCacheKey>
 
         RowCacheKey that = (RowCacheKey) o;
 
-        return cfId.equals(that.cfId) && Arrays.equals(key, that.key);
+        return ksAndCFName.equals(that.ksAndCFName) && Arrays.equals(key, that.key);
     }
 
     @Override
     public int hashCode()
     {
-        int result = cfId.hashCode();
+        int result = ksAndCFName.hashCode();
         result = 31 * result + (key != null ? Arrays.hashCode(key) : 0);
         return result;
     }
 
-    public int compareTo(RowCacheKey otherKey)
-    {
-        return (cfId.compareTo(otherKey.cfId) < 0) ? -1 : ((cfId.equals(otherKey.cfId)) ?  FBUtilities.compareUnsigned(key, otherKey.key, 0, 0, key.length, otherKey.key.length) : 1);
-    }
-
     @Override
     public String toString()
     {
-        return String.format("RowCacheKey(cfId:%s, key:%s)", cfId, Arrays.toString(key));
+        return String.format("RowCacheKey(ksAndCFName:%s, key:%s)", ksAndCFName, Arrays.toString(key));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 2c6a30c..2939f09 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.thrift.CqlRow;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 import org.github.jamm.Unmetered;
 
@@ -385,6 +386,8 @@ public final class CFMetaData
     public final UUID cfId;                           // internal id, never exposed to user
     public final String ksName;                       // name of keyspace
     public final String cfName;                       // name of this column family
+    public final Pair<String, String> ksAndCFName;
+    public final byte[] ksAndCFBytes;
     public final ColumnFamilyType cfType;             // standard, super
     public volatile CellNameType comparator;          // bytes, long, timeuuid, utf8, etc.
 
@@ -475,6 +478,12 @@ public final class CFMetaData
         cfId = id;
         ksName = keyspace;
         cfName = name;
+        ksAndCFName = Pair.create(keyspace, name);
+        byte[] ksBytes = FBUtilities.toWriteUTFBytes(ksName);
+        byte[] cfBytes = FBUtilities.toWriteUTFBytes(cfName);
+        ksAndCFBytes = Arrays.copyOf(ksBytes, ksBytes.length + cfBytes.length);
+        System.arraycopy(cfBytes, 0, ksAndCFBytes, ksBytes.length, cfBytes.length);
+
         cfType = type;
         comparator = comp;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3a6a8fd..84381a0 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1437,16 +1437,11 @@ public class DatabaseDescriptor
         return conf.index_interval;
     }
 
-    public static File getSerializedCachePath(String ksName, String cfName, UUID cfId, CacheService.CacheType cacheType, String version)
-    {
-        StringBuilder builder = new StringBuilder();
-        builder.append(ksName).append('-');
-        builder.append(cfName).append('-');
-        if (cfId != null)
-            builder.append(ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(cfId))).append('-');
-        builder.append(cacheType);
-        builder.append((version == null ? "" : "-" + version + ".db"));
-        return new File(conf.saved_caches_directory, builder.toString());
+    public static File getSerializedCachePath(CacheService.CacheType cacheType, String version)
+    {
+        String name = cacheType.toString()
+                + (version == null ? "" : "-" + version + ".db");
+        return new File(conf.saved_caches_directory, name);
     }
 
     public static int getDynamicUpdateInterval()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 8e9802f..fada670 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.service.MigrationManager;
@@ -129,6 +130,53 @@ public class Schema
         return keyspaceInstances.get(keyspaceName);
     }
 
+    /**
+     * Retrieve a CFS by name even if that CFS is an index
+     *
+     * An index is identified by looking for '.' in the CF name and separating to find the base table
+     * containing the index
+     * @param ksNameAndCFName
+     * @return The named CFS or null if the keyspace, base table, or index don't exist
+     */
+    public ColumnFamilyStore getColumnFamilyStoreIncludingIndexes(Pair<String, String> ksNameAndCFName) {
+        String ksName = ksNameAndCFName.left;
+        String cfName = ksNameAndCFName.right;
+        Pair<String, String> baseTable;
+
+        /*
+         * Split does special case a one character regex, and it looks like it can detect
+         * if you use two characters to escape '.', but it still allocates a useless array.
+         */
+        int indexOfSeparator = cfName.indexOf('.');
+        if (indexOfSeparator > -1)
+            baseTable = Pair.create(ksName, cfName.substring(0, indexOfSeparator));
+        else
+            baseTable = ksNameAndCFName;
+
+        UUID cfId = cfIdMap.get(baseTable);
+        if (cfId == null)
+            return null;
+
+        Keyspace ks = keyspaceInstances.get(ksName);
+        if (ks == null)
+            return null;
+
+        ColumnFamilyStore baseCFS = ks.getColumnFamilyStore(cfId);
+
+        //Not an index
+        if (indexOfSeparator == -1)
+            return baseCFS;
+
+        if (baseCFS == null)
+            return null;
+
+        SecondaryIndex index = baseCFS.indexManager.getIndexByName(cfName);
+        if (index == null)
+            return null;
+
+        return index.getIndexCfs();
+    }
+
     public ColumnFamilyStore getColumnFamilyStoreInstance(UUID cfId)
     {
         Pair<String, String> pair = cfIdMap.inverse().get(cfId);
@@ -302,12 +350,12 @@ public class Schema
     }
 
     /**
-     * @param cfId The identifier of the ColumnFamily to lookup
-     * @return true if the CF id is a known one, false otherwise.
+     * @param ksAndCFName The identifier of the ColumnFamily to lookup
+     * @return true if the KS and CF pair is a known one, false otherwise.
      */
-    public boolean hasCF(UUID cfId)
+    public boolean hasCF(Pair<String, String> ksAndCFName)
     {
-        return cfIdMap.containsValue(cfId);
+        return cfIdMap.containsKey(ksAndCFName);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 25b3e57..ffaa276 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -358,8 +358,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         fileIndexGenerator.set(generation);
         sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2;
 
-        CachingOptions caching = metadata.getCaching();
-
         logger.info("Initializing {}.{}", keyspace.getName(), name);
 
         // scan for sstables corresponding to this cf and load them
@@ -372,9 +370,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             data.addInitialSSTables(sstables);
         }
 
-        if (caching.keyCache.isEnabled())
-            CacheService.instance.keyCache.loadSaved(this);
-
         // compaction strategy should be created after the CFS has been prepared
         this.compactionStrategyWrapper = new WrappingCompactionStrategy(this);
 
@@ -632,7 +627,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public static void removeUnfinishedCompactionLeftovers(CFMetaData metadata, Map<Integer, UUID> unfinishedCompactions)
     {
         Directories directories = new Directories(metadata);
-
         Set<Integer> allGenerations = new HashSet<>();
         for (Descriptor desc : directories.sstableLister().list().keySet())
             allGenerations.add(desc.generation);
@@ -702,39 +696,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    // must be called after all sstables are loaded since row cache merges all row versions
-    public void initRowCache()
-    {
-        if (!isRowCacheEnabled())
-            return;
-
-        long start = System.nanoTime();
-
-        int cachedRowsRead = CacheService.instance.rowCache.loadSaved(this);
-        if (cachedRowsRead > 0)
-            logger.info("Completed loading ({} ms; {} keys) row cache for {}.{}",
-                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
-                        cachedRowsRead,
-                        keyspace.getName(),
-                        name);
-    }
-
-    public void initCounterCache()
-    {
-        if (!metadata.isCounter() || CacheService.instance.counterCache.getCapacity() == 0)
-            return;
-
-        long start = System.nanoTime();
-
-        int cachedShardsRead = CacheService.instance.counterCache.loadSaved(this);
-        if (cachedShardsRead > 0)
-            logger.info("Completed loading ({} ms; {} shards) counter cache for {}.{}",
-                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
-                        cachedShardsRead,
-                        keyspace.getName(),
-                        name);
-    }
-
     /**
      * See #{@code StorageService.loadNewSSTables(String, String)} for more info
      *
@@ -1246,7 +1207,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         if (!isRowCacheEnabled())
             return;
 
-        RowCacheKey cacheKey = new RowCacheKey(metadata.cfId, key);
+        RowCacheKey cacheKey = new RowCacheKey(metadata.ksAndCFName, key);
         invalidateCachedRow(cacheKey);
     }
 
@@ -1696,7 +1657,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         assert isRowCacheEnabled()
                : String.format("Row cache is not enabled on column family [" + name + "]");
 
-        RowCacheKey key = new RowCacheKey(cfId, filter.key);
+        RowCacheKey key = new RowCacheKey(metadata.ksAndCFName, filter.key);
 
         // attempt a sentinel-read-cache sequence.  if a write invalidates our sentinel, we'll return our
         // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
@@ -2068,7 +2029,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
         {
             DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
-            if (key.cfId == metadata.cfId && !Range.isInRanges(dk.getToken(), ranges))
+            if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges))
                 invalidateCachedRow(dk);
         }
 
@@ -2077,7 +2038,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
             {
                 DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
-                if (key.cfId == metadata.cfId && !Range.isInRanges(dk.getToken(), ranges))
+                if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges))
                     CacheService.instance.counterCache.remove(key);
             }
         }
@@ -2527,16 +2488,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         if (!isRowCacheEnabled())
             return null;
 
-        IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new RowCacheKey(metadata.cfId, key));
+        IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new RowCacheKey(metadata.ksAndCFName, key));
         return cached == null || cached instanceof RowCacheSentinel ? null : (ColumnFamily)cached;
     }
 
     private void invalidateCaches()
     {
-        CacheService.instance.invalidateKeyCacheForCf(metadata.cfId);
-        CacheService.instance.invalidateRowCacheForCf(metadata.cfId);
+        CacheService.instance.invalidateKeyCacheForCf(metadata.ksAndCFName);
+        CacheService.instance.invalidateRowCacheForCf(metadata.ksAndCFName);
         if (metadata.isCounter())
-            CacheService.instance.invalidateCounterCacheForCf(metadata.cfId);
+            CacheService.instance.invalidateCounterCacheForCf(metadata.ksAndCFName);
     }
 
     /**
@@ -2544,7 +2505,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public boolean containsCachedRow(DecoratedKey key)
     {
-        return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.cfId, key));
+        return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.ksAndCFName, key));
     }
 
     public void invalidateCachedRow(RowCacheKey key)
@@ -2558,21 +2519,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         if (cfId == null)
             return; // secondary index
 
-        invalidateCachedRow(new RowCacheKey(cfId, key));
+        invalidateCachedRow(new RowCacheKey(metadata.ksAndCFName, key));
     }
 
     public ClockAndCount getCachedCounter(ByteBuffer partitionKey, CellName cellName)
     {
         if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
             return null;
-        return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.cfId, partitionKey, cellName));
+        return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, cellName));
     }
 
     public void putCachedCounter(ByteBuffer partitionKey, CellName cellName, ClockAndCount clockAndCount)
     {
         if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
             return;
-        CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.cfId, partitionKey, cellName), clockAndCount);
+        CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, cellName), clockAndCount);
     }
 
     public void forceMajorCompaction() throws InterruptedException, ExecutionException
@@ -3008,11 +2969,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.getCurrentMemtable() == view.getOldestMemtable();
     }
 
-    private boolean isRowCacheEnabled()
+    public boolean isRowCacheEnabled()
     {
         return metadata.getCaching().rowCache.isEnabled() && CacheService.instance.rowCache.getCapacity() > 0;
     }
 
+    public boolean isCounterCacheEnabled()
+    {
+        return metadata.isCounter() && CacheService.instance.counterCache.getCapacity() > 0;
+    }
+
+    public boolean isKeyCacheEnabled()
+    {
+        return metadata.getCaching().keyCache.isEnabled() && CacheService.instance.keyCache.getCapacity() > 0;
+    }
+
     /**
      * Discard all SSTables that were created before given timestamp.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 4f59c40..03c3d2b 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -120,10 +120,6 @@ public class Keyspace
                     // open and store the keyspace
                     keyspaceInstance = new Keyspace(keyspaceName, loadSSTables);
                     schema.storeKeyspaceInstance(keyspaceInstance);
-
-                    // keyspace has to be constructed and in the cache before cacheRow can be called
-                    for (ColumnFamilyStore cfs : keyspaceInstance.getColumnFamilyStores())
-                        cfs.initRowCache();
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index 01035c4..77b745c 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -142,7 +142,7 @@ public class RowIndexEntry implements IMeasurableMemory
             skipPromotedIndex(in);
         }
 
-        public static void skipPromotedIndex(DataInput in) throws IOException
+        private static void skipPromotedIndex(DataInput in) throws IOException
         {
             int size = in.readInt();
             if (size <= 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index a9ae069..ad3aae8 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -95,7 +95,8 @@ public class SecondaryIndexManager
     /**
      * Keeps all secondary index instances, either per-column or per-row
      */
-    private final Set<SecondaryIndex> allIndexes;
+    private final Collection<SecondaryIndex> allIndexes;
+    private final Map<String, SecondaryIndex> indexesByName;
 
 
     /**
@@ -107,7 +108,8 @@ public class SecondaryIndexManager
     {
         indexesByColumn = new ConcurrentSkipListMap<>();
         rowLevelIndexMap = new ConcurrentHashMap<>();
-        allIndexes = Collections.newSetFromMap(new ConcurrentHashMap<SecondaryIndex, Boolean>());
+        indexesByName = new ConcurrentHashMap<String, SecondaryIndex>();
+        allIndexes = indexesByName.values();
 
         this.baseCfs = baseCfs;
     }
@@ -158,7 +160,7 @@ public class SecondaryIndexManager
     {
         idxNames = filterByColumn(idxNames);
         if (idxNames.isEmpty())
-            return;        
+            return;
 
         logger.info(String.format("Submitting index build of %s for data in %s",
                                   idxNames, StringUtils.join(sstables, ", ")));
@@ -172,7 +174,7 @@ public class SecondaryIndexManager
         logger.info("Index build of {} complete", idxNames);
     }
 
-    public boolean indexes(CellName name, Set<SecondaryIndex> indexes)
+    public boolean indexes(CellName name, Collection<SecondaryIndex> indexes)
     {
         boolean matching = false;
         for (SecondaryIndex index : indexes)
@@ -186,7 +188,7 @@ public class SecondaryIndexManager
         return matching;
     }
 
-    public Set<SecondaryIndex> indexFor(CellName name, Set<SecondaryIndex> indexes)
+    public Set<SecondaryIndex> indexFor(CellName name, Collection<SecondaryIndex> indexes)
     {
         Set<SecondaryIndex> matching = null;
         for (SecondaryIndex index : indexes)
@@ -319,7 +321,7 @@ public class SecondaryIndexManager
         indexesByColumn.put(cdef.name.bytes, index);
 
         // Add to all indexes set:
-        allIndexes.add(index);
+        indexesByName.put(index.getIndexName(), index);
 
         // if we're just linking in the index to indexedColumns on an
         // already-built index post-restart, we're done
@@ -422,11 +424,16 @@ public class SecondaryIndexManager
     /**
      * @return all of the secondary indexes without distinction to the (non-)backed by secondary ColumnFamilyStore.
      */
-    public Set<SecondaryIndex> getIndexes()
+    public Collection<SecondaryIndex> getIndexes()
     {
         return allIndexes;
     }
 
+    public SecondaryIndex getIndexByName(String name)
+    {
+        return indexesByName.get(name);
+    }
+
     /**
      * @return if there are ANY indexes for this table..
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 86f6a23..0f307b0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1513,7 +1513,7 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
 
     public KeyCacheKey getCacheKey(DecoratedKey key)
     {
-        return new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
+        return new KeyCacheKey(metadata.ksAndCFName, descriptor, key.getKey());
     }
 
     public void cacheKey(DecoratedKey key, RowIndexEntry info)
@@ -1527,14 +1527,14 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
             return;
         }
 
-        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
+        KeyCacheKey cacheKey = new KeyCacheKey(metadata.ksAndCFName, descriptor, key.getKey());
         logger.trace("Adding cache entry for {} -> {}", cacheKey, info);
         keyCache.put(cacheKey, info);
     }
 
     public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
     {
-        return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.getKey()), updateStats);
+        return getCachedPosition(new KeyCacheKey(metadata.ksAndCFName, descriptor, key.getKey()), updateStats);
     }
 
     private RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats)
@@ -1596,7 +1596,7 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
         if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey))
         {
             DecoratedKey decoratedKey = (DecoratedKey)key;
-            KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, decoratedKey.getKey());
+            KeyCacheKey cacheKey = new KeyCacheKey(metadata.ksAndCFName, descriptor, decoratedKey.getKey());
             RowIndexEntry cachedPosition = getCachedPosition(cacheKey, updateCacheAndStats);
             if (cachedPosition != null)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index a43d6d5..50d8903 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -283,13 +282,13 @@ public class CacheService implements CacheServiceMBean
         keyCache.clear();
     }
 
-    public void invalidateKeyCacheForCf(UUID cfId)
+    public void invalidateKeyCacheForCf(Pair<String, String> ksAndCFName)
     {
         Iterator<KeyCacheKey> keyCacheIterator = keyCache.getKeySet().iterator();
         while (keyCacheIterator.hasNext())
         {
             KeyCacheKey key = keyCacheIterator.next();
-            if (key.cfId.equals(cfId))
+            if (key.ksAndCFName.equals(ksAndCFName))
                 keyCacheIterator.remove();
         }
     }
@@ -299,24 +298,24 @@ public class CacheService implements CacheServiceMBean
         rowCache.clear();
     }
 
-    public void invalidateRowCacheForCf(UUID cfId)
+    public void invalidateRowCacheForCf(Pair<String, String> ksAndCFName)
     {
         Iterator<RowCacheKey> rowCacheIterator = rowCache.getKeySet().iterator();
         while (rowCacheIterator.hasNext())
         {
             RowCacheKey rowCacheKey = rowCacheIterator.next();
-            if (rowCacheKey.cfId.equals(cfId))
+            if (rowCacheKey.ksAndCFName.equals(ksAndCFName))
                 rowCacheIterator.remove();
         }
     }
 
-    public void invalidateCounterCacheForCf(UUID cfId)
+    public void invalidateCounterCacheForCf(Pair<String, String> ksAndCFName)
     {
         Iterator<CounterCacheKey> counterCacheIterator = counterCache.getKeySet().iterator();
         while (counterCacheIterator.hasNext())
         {
             CounterCacheKey counterCacheKey = counterCacheIterator.next();
-            if (counterCacheKey.cfId.equals(cfId))
+            if (counterCacheKey.ksAndCFName.equals(ksAndCFName))
                 counterCacheIterator.remove();
         }
     }
@@ -405,16 +404,24 @@ public class CacheService implements CacheServiceMBean
 
     public static class CounterCacheSerializer implements CacheSerializer<CounterCacheKey, ClockAndCount>
     {
-        public void serialize(CounterCacheKey key, DataOutputPlus out) throws IOException
+        public void serialize(CounterCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException
         {
+            assert(cfs.metadata.isCounter());
+            out.write(cfs.metadata.ksAndCFBytes);
             ByteBufferUtil.writeWithLength(key.partitionKey, out);
             ByteBufferUtil.writeWithLength(key.cellName, out);
         }
 
         public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
         {
+            //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
+            //parameter so they aren't deserialized here, even though they are serialized by this serializer
             final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in);
-            final CellName cellName = cfs.metadata.comparator.cellFromByteBuffer(ByteBufferUtil.readWithLength(in));
+            ByteBuffer cellNameBuffer = ByteBufferUtil.readWithLength(in);
+            if (cfs == null || !cfs.metadata.isCounter() || !cfs.isCounterCacheEnabled())
+                return null;
+            assert(cfs.metadata.isCounter());
+            final CellName cellName = cfs.metadata.comparator.cellFromByteBuffer(cellNameBuffer);
             return StageManager.getStage(Stage.READ).submit(new Callable<Pair<CounterCacheKey, ClockAndCount>>()
             {
                 public Pair<CounterCacheKey, ClockAndCount> call() throws Exception
@@ -431,7 +438,7 @@ public class CacheService implements CacheServiceMBean
                     if (cell == null || !cell.isLive(Long.MIN_VALUE))
                         return null;
                     ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
-                    return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, cellName), clockAndCount);
+                    return Pair.create(CounterCacheKey.create(cfs.metadata.ksAndCFName, partitionKey, cellName), clockAndCount);
                 }
             });
         }
@@ -439,14 +446,22 @@ public class CacheService implements CacheServiceMBean
 
     public static class RowCacheSerializer implements CacheSerializer<RowCacheKey, IRowCacheEntry>
     {
-        public void serialize(RowCacheKey key, DataOutputPlus out) throws IOException
+        public void serialize(RowCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException
         {
+            assert(!cfs.isIndex());
+            out.write(cfs.metadata.ksAndCFBytes);
             ByteBufferUtil.writeWithLength(key.key, out);
         }
 
         public Future<Pair<RowCacheKey, IRowCacheEntry>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
         {
+            //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
+            //parameter so they aren't deserialized here, even though they are serialized by this serializer
             final ByteBuffer buffer = ByteBufferUtil.readWithLength(in);
+            if (cfs == null  || !cfs.isRowCacheEnabled())
+                return null;
+            assert(!cfs.isIndex());
+
             return StageManager.getStage(Stage.READ).submit(new Callable<Pair<RowCacheKey, IRowCacheEntry>>()
             {
                 public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception
@@ -454,7 +469,7 @@ public class CacheService implements CacheServiceMBean
                     DecoratedKey key = cfs.partitioner.decorateKey(buffer);
                     QueryFilter cacheFilter = new QueryFilter(key, cfs.getColumnFamilyName(), cfs.readFilterForCache(), Integer.MIN_VALUE);
                     ColumnFamily data = cfs.getTopLevelColumns(cacheFilter, Integer.MIN_VALUE);
-                    return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry) data);
+                    return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry) data);
                 }
             });
         }
@@ -462,24 +477,23 @@ public class CacheService implements CacheServiceMBean
 
     public static class KeyCacheSerializer implements CacheSerializer<KeyCacheKey, RowIndexEntry>
     {
-        public void serialize(KeyCacheKey key, DataOutputPlus out) throws IOException
+        public void serialize(KeyCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException
         {
             RowIndexEntry entry = CacheService.instance.keyCache.getInternal(key);
             if (entry == null)
                 return;
 
-            CFMetaData cfm = Schema.instance.getCFMetaData(key.cfId);
-            if (cfm == null)
-                return; // the table no longer exists.
-
+            out.write(cfs.metadata.ksAndCFBytes);
             ByteBufferUtil.writeWithLength(key.key, out);
             out.writeInt(key.desc.generation);
             out.writeBoolean(true);
-            cfm.comparator.rowIndexEntrySerializer().serialize(entry, out);
+            cfs.metadata.comparator.rowIndexEntrySerializer().serialize(entry, out);
         }
 
         public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException
         {
+            //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
+            //parameter so they aren't deserialized here, even though they are serialized by this serializer
             int keyLength = input.readInt();
             if (keyLength > FBUtilities.MAX_UNSIGNED_SHORT)
             {
@@ -488,15 +502,15 @@ public class CacheService implements CacheServiceMBean
             }
             ByteBuffer key = ByteBufferUtil.read(input, keyLength);
             int generation = input.readInt();
-            SSTableReader reader = findDesc(generation, cfs.getSSTables());
             input.readBoolean(); // backwards compatibility for "promoted indexes" boolean
-            if (reader == null)
+            SSTableReader reader = null;
+            if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = findDesc(generation, cfs.getSSTables())) == null)
             {
-                RowIndexEntry.Serializer.skipPromotedIndex(input);
+                RowIndexEntry.Serializer.skip(input);
                 return null;
             }
             RowIndexEntry entry = reader.metadata.comparator.rowIndexEntrySerializer().deserialize(input, reader.descriptor.version);
-            return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.cfId, reader.descriptor, key), entry));
+            return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry));
         }
 
         private SSTableReader findDesc(int generation, Collection<SSTableReader> collection)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index d078203..17553f3 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -25,8 +25,10 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.rmi.registry.LocateRegistry;
 import java.rmi.server.RMIServerSocketFactory;
+import java.util.List;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
@@ -36,6 +38,8 @@ import javax.management.remote.rmi.RMIConnectorServer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -99,7 +103,7 @@ public class CassandraDaemon
                     url.append("service:jmx:");
                     url.append("rmi://localhost/jndi/");
                     url.append("rmi://localhost:").append(jmxPort).append("/jmxrmi");
-                    
+
                     Map env = new HashMap();
                     env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE, serverFactory);
 
@@ -144,7 +148,7 @@ public class CassandraDaemon
      */
     protected void setup()
     {
-        try 
+        try
         {
             logger.info("Hostname: {}", InetAddress.getLocalHost().getHostName());
         }
@@ -330,11 +334,16 @@ public class CassandraDaemon
             }
         }
 
-        if (CacheService.instance.keyCache.size() > 0)
-            logger.info("completed pre-loading ({} keys) key cache.", CacheService.instance.keyCache.size());
 
-        if (CacheService.instance.rowCache.size() > 0)
-            logger.info("completed pre-loading ({} keys) row cache.", CacheService.instance.rowCache.size());
+        try
+        {
+            loadRowAndKeyCacheAsync().get();
+        }
+        catch (Throwable t)
+        {
+            JVMStabilityInspector.inspectThrowable(t);
+            logger.warn("Error loading key or row cache", t);
+        }
 
         try
         {
@@ -429,6 +438,22 @@ public class CassandraDaemon
         completeSetup();
     }
 
+    /*
+     * Asynchronously load the row and key cache in one off threads and return a compound future of the result.
+     * Error handling is pushed into the cache load since cache loads are allowed to fail and are handled by logging.
+     */
+    private ListenableFuture<?> loadRowAndKeyCacheAsync()
+    {
+        final ListenableFuture<Integer> keyCacheLoad = CacheService.instance.keyCache.loadSavedAsync();
+
+        final ListenableFuture<Integer> rowCacheLoad = CacheService.instance.rowCache.loadSavedAsync();
+
+        @SuppressWarnings("unchecked")
+        ListenableFuture<List<Integer>> retval = Futures.successfulAsList(keyCacheLoad, rowCacheLoad);
+
+        return retval;
+    }
+
     @VisibleForTesting
     public void completeSetup()
     {
@@ -533,7 +558,7 @@ public class CassandraDaemon
                 logger.error("error registering MBean {}", MBEAN_NAME, e);
                 //Allow the server to start even if the bean can't be registered
             }
-            
+
             setup();
 
             if (pidFile != null)
@@ -625,15 +650,15 @@ public class CassandraDaemon
     {
         instance.activate();
     }
-    
+
     static class NativeAccess implements NativeAccessMBean
     {
         public boolean isAvailable()
         {
             return CLibrary.jnaAvailable();
         }
-        
-        public boolean isMemoryLockable() 
+
+        public boolean isMemoryLockable()
         {
             return CLibrary.jnaMemoryLockable();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index f5950e3..431f163 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -719,10 +719,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         prepareToJoin();
 
-        // Has to be called after the host id has potentially changed in prepareToJoin().
-        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-            if (cfs.metadata.isCounter())
-                cfs.initCounterCache();
+        try
+        {
+            CacheService.instance.counterCache.loadSavedAsync().get();
+        }
+        catch (Throwable t)
+        {
+            JVMStabilityInspector.inspectThrowable(t);
+            logger.warn("Error loading counter cache", t);
+        }
 
         if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))
         {
@@ -2443,8 +2448,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     /**
      * Takes the snapshot of a multiple column family from different keyspaces. A snapshot name must be specified.
-     * 
-     * 
+     *
+     *
      * @param tag
      *            the tag given to the snapshot; may not be null or empty
      * @param columnFamilyList
@@ -3817,7 +3822,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     public synchronized void drain() throws IOException, InterruptedException, ExecutionException
     {
         inShutdownHook = true;
-        
+
         ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
         ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
         if (mutationStage.isTerminated() && counterMutationStage.isTerminated())
@@ -3947,32 +3952,32 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      */
     public LinkedHashMap<InetAddress, Float> effectiveOwnership(String keyspace) throws IllegalStateException
     {
-    	
+
     	if (keyspace != null)
     	{
     		Keyspace keyspaceInstance = Schema.instance.getKeyspaceInstance(keyspace);
 			if(keyspaceInstance == null)
 				throw new IllegalArgumentException("The keyspace " + keyspace + ", does not exist");
-    		
+
     		if(keyspaceInstance.getReplicationStrategy() instanceof LocalStrategy)
 				throw new IllegalStateException("Ownership values for keyspaces with LocalStrategy are meaningless");
     	}
     	else
     	{
         	List<String> nonSystemKeyspaces = Schema.instance.getNonSystemKeyspaces();
-        	
+
         	//system_traces is a non-system keyspace however it needs to be counted as one for this process
         	int specialTableCount = 0;
         	if (nonSystemKeyspaces.contains("system_traces"))
 			{
         		specialTableCount += 1;
 			}
-        	if (nonSystemKeyspaces.size() > specialTableCount) 	   		
+        	if (nonSystemKeyspaces.size() > specialTableCount)
         		throw new IllegalStateException("Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless");
-        	
+
         	keyspace = "system_traces";
     	}
-    	
+
         TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap();
 
         Collection<Collection<InetAddress>> endpointsGroupedByDc = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index f866610..8d8dd22 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -713,4 +713,20 @@ public class FBUtilities
         digest.update((byte) ((val >>>  8) & 0xFF));
         digest.update((byte)  ((val >>> 0) & 0xFF));
     }
+
+    public static byte[] toWriteUTFBytes(String s)
+    {
+        try
+        {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(baos);
+            dos.writeUTF(s);
+            dos.flush();
+            return baos.toByteArray();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
index 28afef1..e6ef69e 100644
--- a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
+++ b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
@@ -59,9 +59,8 @@ public class AutoSavingCacheTest extends SchemaLoader
         Assert.assertEquals(0, keyCache.size());
 
         // then load saved
-        keyCache.loadSaved(cfs);
-        Assert.assertEquals(2, keyCache.size());
+        keyCache.loadSavedAsync().get();
         for (SSTableReader sstable : cfs.getSSTables())
-            Assert.assertNotNull(keyCache.get(new KeyCacheKey(cfs.metadata.cfId, sstable.descriptor, ByteBufferUtil.bytes("key1"))));
+            Assert.assertNotNull(keyCache.get(new KeyCacheKey(cfs.metadata.ksAndCFName, sstable.descriptor, ByteBufferUtil.bytes("key1"))));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index 71d4f80..63f89a4 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -1,4 +1,3 @@
-package org.apache.cassandra.cache;
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.cassandra.cache;
  * under the License.
  *
  */
+package org.apache.cassandra.cache;
 
 
 import java.nio.ByteBuffer;
@@ -31,6 +31,7 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.db.ArrayBackedSortedColumns;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.utils.Pair;
 
 import com.googlecode.concurrentlinkedhashmap.Weighers;
 
@@ -114,21 +115,20 @@ public class CacheProviderTest extends SchemaLoader
         simpleCase(cf, cache);
         concurrentCase(cf, cache);
     }
-    
+
     @Test
     public void testKeys()
     {
-        UUID cfId = UUID.randomUUID();
-
+        Pair<String, String> ksAndCFName = Pair.create(keyspaceName, cfName);
         byte[] b1 = {1, 2, 3, 4};
-        RowCacheKey key1 = new RowCacheKey(cfId, ByteBuffer.wrap(b1));
+        RowCacheKey key1 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b1));
         byte[] b2 = {1, 2, 3, 4};
-        RowCacheKey key2 = new RowCacheKey(cfId, ByteBuffer.wrap(b2));
+        RowCacheKey key2 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b2));
         assertEquals(key1, key2);
         assertEquals(key1.hashCode(), key2.hashCode());
-        
+
         byte[] b3 = {1, 2, 3, 5};
-        RowCacheKey key3 = new RowCacheKey(cfId, ByteBuffer.wrap(b3));
+        RowCacheKey key3 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b3));
         assertNotSame(key1, key3);
         assertNotSame(key1.hashCode(), key3.hashCode());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
new file mode 100644
index 0000000..0e879e9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.MetricName;
+
+import org.apache.cassandra.cache.KeyCacheKey;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.metrics.CacheMetrics;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KeyCacheCqlTest extends CQLTester
+{
+
+    static final String commonColumnsDef =
+    "part_key_a     int," +
+    "part_key_b     text," +
+    "clust_key_a    int," +
+    "clust_key_b    text," +
+    "clust_key_c    frozen<list<text>>," + // to make it really big
+    "col_text       text," +
+    "col_int        int," +
+    "col_long       bigint,";
+    static final String commonColumns =
+    "part_key_a," +
+    "part_key_b," +
+    "clust_key_a," +
+    "clust_key_b," +
+    "clust_key_c," + // to make it really big
+    "col_text," +
+    "col_int," +
+    "col_long";
+
+    @Test
+    public void test2iKeyCachePaths() throws Throwable
+    {
+        String table = createTable("CREATE TABLE %s ("
+                                   + commonColumnsDef
+                                   + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))");
+        createIndex("CREATE INDEX some_index ON %s (col_int)");
+        insertData(table, "some_index", true);
+        clearCache();
+
+        CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+
+        for (int i = 0; i < 10; i++)
+        {
+            UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+            assertEquals(500, result.size());
+        }
+
+        long hits = metrics.hits.count();
+        long requests = metrics.requests.count();
+        assertEquals(4900, hits);
+        assertEquals(5250, requests);
+
+        //
+
+        for (int i = 0; i < 10; i++)
+        {
+            UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+            // 100 part-keys * 50 clust-keys
+            // indexed on part-key % 10 = 10 index partitions
+            // (50 clust-keys  *  100-part-keys  /  10 possible index-values) = 500
+            assertEquals(500, result.size());
+        }
+
+        metrics = CacheService.instance.keyCache.getMetrics();
+        hits = metrics.hits.count();
+        requests = metrics.requests.count();
+        assertEquals(10000, hits);
+        assertEquals(10500, requests);
+
+        CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
+
+        int beforeSize = CacheService.instance.keyCache.size();
+
+        CacheService.instance.keyCache.clear();
+
+        Assert.assertEquals(0, CacheService.instance.keyCache.size());
+
+        // then load saved
+        CacheService.instance.keyCache.loadSaved();
+
+        assertEquals(beforeSize, CacheService.instance.keyCache.size());
+
+        for (int i = 0; i < 10; i++)
+        {
+            UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+            // 100 part-keys * 50 clust-keys
+            // indexed on part-key % 10 = 10 index partitions
+            // (50 clust-keys  *  100-part-keys  /  10 possible index-values) = 500
+            assertEquals(500, result.size());
+        }
+
+        //Test Schema.getColumnFamilyStoreIncludingIndexes, several null check paths
+        //are defensive and unreachable
+        assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create("foo", "bar")));
+        assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(KEYSPACE, "bar")));
+
+        dropTable("DROP TABLE %s");
+
+        //Test loading for a dropped 2i/table
+        CacheService.instance.keyCache.clear();
+
+        // then load saved
+        CacheService.instance.keyCache.loadSaved();
+
+        assertEquals(0, CacheService.instance.keyCache.size());
+    }
+
+    @Test
+    public void test2iKeyCachePathsSaveKeysForDroppedTable() throws Throwable
+    {
+        String table = createTable("CREATE TABLE %s ("
+                                   + commonColumnsDef
+                                   + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))");
+        createIndex("CREATE INDEX some_index ON %s (col_int)");
+        insertData(table, "some_index", true);
+        clearCache();
+
+        CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+
+        for (int i = 0; i < 10; i++)
+        {
+            UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+            assertEquals(500, result.size());
+        }
+
+        long hits = metrics.hits.count();
+        long requests = metrics.requests.count();
+        assertEquals(4900, hits);
+        assertEquals(5250, requests);
+
+        //
+
+        for (int i = 0; i < 10; i++)
+        {
+            UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i);
+            // 100 part-keys * 50 clust-keys
+            // indexed on part-key % 10 = 10 index partitions
+            // (50 clust-keys  *  100-part-keys  /  10 possible index-values) = 500
+            assertEquals(500, result.size());
+        }
+
+        metrics = CacheService.instance.keyCache.getMetrics();
+        hits = metrics.hits.count();
+        requests = metrics.requests.count();
+        assertEquals(10000, hits);
+        assertEquals(10500, requests);
+
+        dropTable("DROP TABLE %s");
+
+        CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
+
+        CacheService.instance.keyCache.clear();
+
+        Assert.assertEquals(0, CacheService.instance.keyCache.size());
+
+        // then load saved
+        CacheService.instance.keyCache.loadSaved();
+
+        for (KeyCacheKey key : CacheService.instance.keyCache.getKeySet())
+        {
+            Assert.assertFalse(key.ksAndCFName.left.equals("KEYSPACE"));
+            Assert.assertFalse(key.ksAndCFName.right.startsWith(table));
+        }
+    }
+
+    // Inserts 100 partitions split over 10 sstables (flush after 10 partitions).
+    // Clustered tables receive 50 CQL rows per partition.
+    private void insertData(String table, String index, boolean withClustering) throws Throwable
+    {
+        StorageService.instance.disableAutoCompaction(KEYSPACE, table);
+        Keyspace.open(KEYSPACE).getColumnFamilyStore(table).forceFlush().get();
+        Keyspace.open(KEYSPACE).getColumnFamilyStore(table).truncateBlocking();
+        if (index != null)
+        {
+            StorageService.instance.disableAutoCompaction(KEYSPACE, table + '.' + index);
+            Keyspace.open(KEYSPACE).getColumnFamilyStore(table).indexManager.getIndexesByNames(ImmutableSet.of(table + "." + index)).iterator().next().forceBlockingFlush();
+        }
+
+        for (int i = 0; i < 100; i++)
+        {
+            int partKeyA = i;
+            String partKeyB = Integer.toOctalString(i);
+            for (int c = 0; c < (withClustering ? 50 : 1); c++)
+            {
+                int clustKeyA = c;
+                String clustKeyB = Integer.toOctalString(c);
+                List<String> clustKeyC = makeList(clustKeyB);
+                String colText = String.valueOf(i) + '-' + String.valueOf(c);
+                int colInt = i % 10;
+                long colLong = c;
+                execute("INSERT INTO %s (" + commonColumns + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
+                        partKeyA, partKeyB,
+                        clustKeyA, clustKeyB, clustKeyC,
+                        colText, colInt, colLong);
+            }
+
+            if (i % 10 == 9)
+            {
+                Keyspace.open(KEYSPACE).getColumnFamilyStore(table).forceFlush().get();
+                if (index != null)
+                    Keyspace.open(KEYSPACE).getColumnFamilyStore(table).indexManager.getIndexesByNames(ImmutableSet.of(table + "." + index)).iterator().next().forceBlockingFlush();
+            }
+        }
+    }
+
+    private static List<String> makeList(String value)
+    {
+        List<String> list = new ArrayList<>(50);
+        for (int i = 0; i < 50; i++)
+        {
+            list.add(value + i);
+        }
+        return list;
+    }
+
+    private static void clearCache()
+    {
+        for (MetricName name : ImmutableSet.copyOf(Metrics.defaultRegistry().allMetrics().keySet()))
+        {
+            Metrics.defaultRegistry().removeMetric(name);
+        }
+        CacheService.instance.keyCache.clear();
+        CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+        Assert.assertEquals(0, metrics.entries.value().intValue());
+        Assert.assertEquals(0L, metrics.hits.count());
+        Assert.assertEquals(0L, metrics.requests.count());
+        Assert.assertEquals(0L, metrics.size.value().longValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/db/CounterCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterCacheTest.java b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
index cb2d97a..20e067c 100644
--- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
@@ -23,6 +23,7 @@ import org.junit.AfterClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -48,6 +49,7 @@ public class CounterCacheTest extends SchemaLoader
     public void testReadWrite()
     {
         ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
         CacheService.instance.invalidateCounterCache();
 
         assertEquals(0, CacheService.instance.counterCache.size());
@@ -72,6 +74,7 @@ public class CounterCacheTest extends SchemaLoader
     public void testSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
     {
         ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
         CacheService.instance.invalidateCounterCache();
 
         ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
@@ -86,11 +89,76 @@ public class CounterCacheTest extends SchemaLoader
         assertEquals(0, CacheService.instance.counterCache.size());
 
         // load from cache and validate
-        CacheService.instance.counterCache.loadSaved(cfs);
+        CacheService.instance.counterCache.loadSaved();
         assertEquals(4, CacheService.instance.counterCache.size());
         assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1)));
         assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2)));
         assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(2), cellname(1)));
         assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(2), cellname(2)));
     }
+
+    @Test
+    public void testDroppedSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        CacheService.instance.invalidateCounterCache();
+
+        ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
+        cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+        new CounterMutation(new Mutation(KS, bytes(2), cells), ConsistencyLevel.ONE).apply();
+
+        // flush the counter cache and invalidate
+        CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
+        CacheService.instance.invalidateCounterCache();
+        assertEquals(0, CacheService.instance.counterCache.size());
+
+        Keyspace ks = Schema.instance.removeKeyspaceInstance(KS);
+
+        try
+        {
+            // load from cache and validate
+            CacheService.instance.counterCache.loadSaved();
+            assertEquals(0, CacheService.instance.counterCache.size());
+        }
+        finally
+        {
+            Schema.instance.storeKeyspaceInstance(ks);
+        }
+    }
+
+    @Test
+    public void testDisabledSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        CacheService.instance.invalidateCounterCache();
+
+        ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
+        cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
+        new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
+        new CounterMutation(new Mutation(KS, bytes(2), cells), ConsistencyLevel.ONE).apply();
+
+        // flush the counter cache and invalidate
+        CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
+        CacheService.instance.invalidateCounterCache();
+        assertEquals(0, CacheService.instance.counterCache.size());
+
+
+        CacheService.instance.setCounterCacheCapacityInMB(0);
+        try
+        {
+            // load from cache and validate
+            CacheService.instance.counterCache.loadSaved();
+            assertEquals(0, CacheService.instance.counterCache.size());
+        }
+        finally
+        {
+            CacheService.instance.setCounterCacheCapacityInMB(1);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e889ee40/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index 1f7024e..4a4c7d5 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -86,7 +86,7 @@ public class KeyCacheTest extends SchemaLoader
         CacheService.instance.invalidateKeyCache();
         assertKeyCacheSize(0, KEYSPACE1, COLUMN_FAMILY2);
 
-        CacheService.instance.keyCache.loadSaved(store);
+        CacheService.instance.keyCache.loadSaved();
         assertKeyCacheSize(savedMap.size(), KEYSPACE1, COLUMN_FAMILY2);
 
         // probably it's better to add equals/hashCode to RowIndexEntry...


[6/7] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by sn...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index d843d4d,0000000..ce12206
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -1,2287 -1,0 +1,2287 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format;
 +
 +import java.io.*;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Predicate;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.Ordering;
 +import com.google.common.primitives.Longs;
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 +import com.clearspring.analytics.stream.cardinality.ICardinality;
 +import com.codahale.metrics.Counter;
 +import org.apache.cassandra.cache.CachingOptions;
 +import org.apache.cassandra.cache.InstrumentingCache;
 +import org.apache.cassandra.cache.KeyCacheKey;
 +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.ScheduledExecutors;
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.commitlog.ReplayPosition;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.db.lifecycle.Tracker;
 +import org.apache.cassandra.dht.*;
 +import org.apache.cassandra.io.FSError;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.metadata.*;
 +import org.apache.cassandra.io.util.*;
 +import org.apache.cassandra.metrics.RestorableMeter;
 +import org.apache.cassandra.metrics.StorageMetrics;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.CacheService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.apache.cassandra.utils.concurrent.Ref;
 +import org.apache.cassandra.utils.concurrent.SelfRefCounted;
 +
 +import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
 +
 +/**
 + * An SSTableReader can be constructed in a number of places, but typically is either
 + * read from disk at startup, or constructed from a flushed memtable, or after compaction
 + * to replace some existing sstables. However once created, an sstablereader may also be modified.
 + *
 + * A reader's OpenReason describes its current stage in its lifecycle, as follows:
-  * 
-  * 
++ *
++ *
 + * <pre> {@code
 + * NORMAL
 + * From:       None        => Reader has been read from disk, either at startup or from a flushed memtable
 + *             EARLY       => Reader is the final result of a compaction
 + *             MOVED_START => Reader WAS being compacted, but this failed and it has been restored to NORMAL status
 + *
 + * EARLY
 + * From:       None        => Reader is a compaction replacement that is either incomplete and has been opened
 + *                            to represent its partial result status, or has been finished but the compaction
 + *                            it is a part of has not yet completed fully
 + *             EARLY       => Same as from None, only it is not the first time it has been
 + *
 + * MOVED_START
 + * From:       NORMAL      => Reader is being compacted. This compaction has not finished, but the compaction result
 + *                            is either partially or fully opened, to either partially or fully replace this reader.
 + *                            This reader's start key has been updated to represent this, so that reads only hit
 + *                            one or the other reader.
 + *
 + * METADATA_CHANGE
 + * From:       NORMAL      => Reader has seen low traffic and the amount of memory available for index summaries is
 + *                            constrained, so its index summary has been downsampled.
 + *         METADATA_CHANGE => Same
 + * } </pre>
 + *
 + * Note that in parallel to this, there are two different Descriptor types; TMPLINK and FINAL; the latter corresponds
 + * to NORMAL state readers and all readers that replace a NORMAL one. TMPLINK is used for EARLY state readers and
 + * no others.
 + *
 + * When a reader is being compacted, if the result is large its replacement may be opened as EARLY before compaction
 + * completes in order to present the result to consumers earlier. In this case the reader will itself be changed to
 + * a MOVED_START state, where its start no longer represents its on-disk minimum key. This is to permit reads to be
 + * directed to only one reader when the two represent the same data. The EARLY file can represent a compaction result
 + * that is either partially complete and still in-progress, or a complete and immutable sstable that is part of a larger
 + * macro compaction action that has not yet fully completed.
 + *
 + * Currently ALL compaction results at least briefly go through an EARLY open state prior to completion, regardless
 + * of if early opening is enabled.
 + *
 + * Since a reader can be created multiple times over the same shared underlying resources, and the exact resources
 + * it shares between each instance differ subtly, we track the lifetime of any underlying resource with its own
 + * reference count, which each instance takes a Ref to. Each instance then tracks references to itself, and once these
 + * all expire it releases its Refs to these underlying resources.
 + *
 + * There is some shared cleanup behaviour needed only once all sstablereaders in a certain stage of their lifecycle
 + * (i.e. EARLY or NORMAL opening), and some that must only occur once all readers of any kind over a single logical
 + * sstable have expired. These are managed by the TypeTidy and GlobalTidy classes at the bottom, and are effectively
 + * managed as another resource each instance tracks its own Ref instance to, to ensure all of these resources are
 + * cleaned up safely and can be debugged otherwise.
 + *
 + * TODO: fill in details about Tracker and lifecycle interactions for tools, and for compaction strategies
 + */
 +public abstract class SSTableReader extends SSTable implements SelfRefCounted<SSTableReader>
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
 +
 +    private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1);
 +    static
 +    {
 +        // Immediately remove readMeter sync task when cancelled.
 +        syncExecutor.setRemoveOnCancelPolicy(true);
 +    }
 +    private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
 +
 +    public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            long ts1 = o1.getMaxTimestamp();
 +            long ts2 = o2.getMaxTimestamp();
 +            return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
 +        }
 +    };
 +
 +    // it's just an object, which we use regular Object equality on; we introduce a special class just for easy recognition
 +    public static final class UniqueIdentifier {}
 +
 +    public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return o1.first.compareTo(o2.first);
 +        }
 +    };
 +
 +    public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
 +
 +    /**
 +     * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound
 +     * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
 +     * later than maxDataAge.
 +     *
 +     * The field is not serialized to disk, so relying on it for more than what truncate does is not advised.
 +     *
 +     * When a new sstable is flushed, maxDataAge is set to the time of creation.
 +     * When a sstable is created from compaction, maxDataAge is set to max of all merged sstables.
 +     *
 +     * The age is in milliseconds since epoc and is local to this host.
 +     */
 +    public final long maxDataAge;
 +
 +    public enum OpenReason
 +    {
 +        NORMAL,
 +        EARLY,
 +        METADATA_CHANGE,
 +        MOVED_START
 +    }
 +
 +    public final OpenReason openReason;
 +    public final UniqueIdentifier instanceId = new UniqueIdentifier();
 +
 +    // indexfile and datafile: might be null before a call to load()
 +    protected SegmentedFile ifile;
 +    protected SegmentedFile dfile;
 +    protected IndexSummary indexSummary;
 +    protected IFilter bf;
 +
 +    protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 +
 +    protected InstrumentingCache<KeyCacheKey, RowIndexEntry> keyCache;
 +
 +    protected final BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
 +
 +    // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
 +    // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
 +    protected final AtomicBoolean isSuspect = new AtomicBoolean(false);
 +
 +    // not final since we need to be able to change level on a file.
 +    protected volatile StatsMetadata sstableMetadata;
 +
 +    protected final AtomicLong keyCacheHit = new AtomicLong(0);
 +    protected final AtomicLong keyCacheRequest = new AtomicLong(0);
 +
 +    private final InstanceTidier tidy = new InstanceTidier(descriptor, metadata);
 +    private final Ref<SSTableReader> selfRef = new Ref<>(this, tidy);
 +
 +    private RestorableMeter readMeter;
 +
 +    /**
 +     * Calculate approximate key count.
 +     * If cardinality estimator is available on all given sstables, then this method use them to estimate
 +     * key count.
 +     * If not, then this uses index summaries.
 +     *
 +     * @param sstables SSTables to calculate key count
 +     * @return estimated key count
 +     */
 +    public static long getApproximateKeyCount(Collection<SSTableReader> sstables)
 +    {
 +        long count = -1;
 +
 +        // check if cardinality estimator is available for all SSTables
 +        boolean cardinalityAvailable = !sstables.isEmpty() && Iterators.all(sstables.iterator(), new Predicate<SSTableReader>()
 +        {
 +            public boolean apply(SSTableReader sstable)
 +            {
 +                return sstable.descriptor.version.hasNewStatsFile();
 +            }
 +        });
 +
 +        // if it is, load them to estimate key count
 +        if (cardinalityAvailable)
 +        {
 +            boolean failed = false;
 +            ICardinality cardinality = null;
 +            for (SSTableReader sstable : sstables)
 +            {
 +                if (sstable.openReason == OpenReason.EARLY)
 +                    continue;
 +
 +                try
 +                {
 +                    CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);
 +                    assert metadata != null : sstable.getFilename();
 +                    if (cardinality == null)
 +                        cardinality = metadata.cardinalityEstimator;
 +                    else
 +                        cardinality = cardinality.merge(metadata.cardinalityEstimator);
 +                }
 +                catch (IOException e)
 +                {
 +                    logger.warn("Reading cardinality from Statistics.db failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +                catch (CardinalityMergeException e)
 +                {
 +                    logger.warn("Cardinality merge failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +            }
 +            if (cardinality != null && !failed)
 +                count = cardinality.cardinality();
 +        }
 +
 +        // if something went wrong above or cardinality is not available, calculate using index summary
 +        if (count < 0)
 +        {
 +            for (SSTableReader sstable : sstables)
 +                count += sstable.estimatedKeys();
 +        }
 +        return count;
 +    }
 +
 +    /**
 +     * Estimates how much of the keys we would keep if the sstables were compacted together
 +     */
 +    public static double estimateCompactionGain(Set<SSTableReader> overlapping)
 +    {
 +        Set<ICardinality> cardinalities = new HashSet<>(overlapping.size());
 +        for (SSTableReader sstable : overlapping)
 +        {
 +            try
 +            {
 +                ICardinality cardinality = ((CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION)).cardinalityEstimator;
 +                if (cardinality != null)
 +                    cardinalities.add(cardinality);
 +                else
 +                    logger.debug("Got a null cardinality estimator in: {}", sstable.getFilename());
 +            }
 +            catch (IOException e)
 +            {
 +                logger.warn("Could not read up compaction metadata for {}", sstable, e);
 +            }
 +        }
 +        long totalKeyCountBefore = 0;
 +        for (ICardinality cardinality : cardinalities)
 +        {
 +            totalKeyCountBefore += cardinality.cardinality();
 +        }
 +        if (totalKeyCountBefore == 0)
 +            return 1;
 +
 +        long totalKeyCountAfter = mergeCardinalities(cardinalities).cardinality();
 +        logger.debug("Estimated compaction gain: {}/{}={}", totalKeyCountAfter, totalKeyCountBefore, ((double)totalKeyCountAfter)/totalKeyCountBefore);
 +        return ((double)totalKeyCountAfter)/totalKeyCountBefore;
 +    }
 +
 +    private static ICardinality mergeCardinalities(Collection<ICardinality> cardinalities)
 +    {
 +        ICardinality base = new HyperLogLogPlus(13, 25); // see MetadataCollector.cardinality
 +        try
 +        {
 +            base = base.merge(cardinalities.toArray(new ICardinality[cardinalities.size()]));
 +        }
 +        catch (CardinalityMergeException e)
 +        {
 +            logger.warn("Could not merge cardinalities", e);
 +        }
 +        return base;
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor) throws IOException
 +    {
 +        CFMetaData metadata;
 +        if (descriptor.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR))
 +        {
 +            int i = descriptor.cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
 +            String parentName = descriptor.cfname.substring(0, i);
 +            CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
 +            ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1));
 +            metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent, def));
 +        }
 +        else
 +        {
 +            metadata = Schema.instance.getCFMetaData(descriptor.ksname, descriptor.cfname);
 +        }
 +        return open(descriptor, metadata);
 +    }
 +
 +    public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException
 +    {
 +        IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)
 +                ? new LocalPartitioner(metadata.getKeyValidator())
 +                : StorageService.getPartitioner();
 +        return open(desc, componentsFor(desc), metadata, p);
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        return open(descriptor, components, metadata, partitioner, true, true);
 +    }
 +
 +    // use only for offline or "Standalone" operations
 +    public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, ColumnFamilyStore cfs) throws IOException
 +    {
 +        return open(descriptor, components, cfs.metadata, cfs.partitioner, false, false); // do not track hotness
 +    }
 +
 +    /**
 +     * Open SSTable reader to be used in batch mode(such as sstableloader).
 +     *
 +     * @param descriptor
 +     * @param components
 +     * @param metadata
 +     * @param partitioner
 +     * @return opened SSTableReader
 +     * @throws IOException
 +     */
 +    public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable " + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                statsMetadata, OpenReason.NORMAL);
 +
 +        // special implementation of load to use non-pooled SegmentedFile builders
 +        try(SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
 +            SegmentedFile.Builder dbuilder = sstable.compression
 +                ? new CompressedSegmentedFile.Builder(null)
 +                : new BufferedSegmentedFile.Builder())
 +        {
 +            if (!sstable.loadSummary(ibuilder, dbuilder))
 +                sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
 +            sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
 +            sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
 +            sstable.bf = FilterFactory.AlwaysPresent;
 +            sstable.setup(false);
 +            return sstable;
 +        }
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      boolean validate,
 +                                      boolean trackHotness) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable " + descriptor;
 +        assert !validate || components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                                                                                                               EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                                             statsMetadata, OpenReason.NORMAL);
 +        try
 +        {
 +            // load index and filter
 +            long start = System.nanoTime();
 +            sstable.load(validationMetadata);
 +            logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 +
 +            sstable.setup(trackHotness);
 +            if (validate)
 +                sstable.validate();
 +
 +            if (sstable.getKeyCache() != null)
 +                logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
 +
 +            return sstable;
 +        }
 +        catch (Throwable t)
 +        {
 +            sstable.selfRef().release();
 +            throw t;
 +        }
 +    }
 +
 +    public static void logOpenException(Descriptor descriptor, IOException e)
 +    {
 +        if (e instanceof FileNotFoundException)
 +            logger.error("Missing sstable component in {}; skipped because of {}", descriptor, e.getMessage());
 +        else
 +            logger.error("Corrupt sstable {}; skipped", descriptor, e);
 +    }
 +
 +    public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
 +                                                    final CFMetaData metadata,
 +                                                    final IPartitioner partitioner)
 +    {
 +        final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
 +
 +        ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors());
 +        for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
 +        {
 +            Runnable runnable = new Runnable()
 +            {
 +                public void run()
 +                {
 +                    SSTableReader sstable;
 +                    try
 +                    {
 +                        sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner);
 +                    }
 +                    catch (CorruptSSTableException ex)
 +                    {
 +                        FileUtils.handleCorruptSSTable(ex);
 +                        logger.error("Corrupt sstable {}; skipping table", entry, ex);
 +                        return;
 +                    }
 +                    catch (FSError ex)
 +                    {
 +                        FileUtils.handleFSError(ex);
 +                        logger.error("Cannot read sstable {}; file system error, skipping table", entry, ex);
 +                        return;
 +                    }
 +                    catch (IOException ex)
 +                    {
 +                        logger.error("Cannot read sstable {}; other IO error, skipping table", entry, ex);
 +                        return;
 +                    }
 +                    sstables.add(sstable);
 +                }
 +            };
 +            executor.submit(runnable);
 +        }
 +
 +        executor.shutdown();
 +        try
 +        {
 +            executor.awaitTermination(7, TimeUnit.DAYS);
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new AssertionError(e);
 +        }
 +
 +        return sstables;
 +
 +    }
 +
 +    /**
 +     * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
 +     */
 +    public static SSTableReader internalOpen(Descriptor desc,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      SegmentedFile ifile,
 +                                      SegmentedFile dfile,
 +                                      IndexSummary isummary,
 +                                      IFilter bf,
 +                                      long maxDataAge,
 +                                      StatsMetadata sstableMetadata,
 +                                      OpenReason openReason)
 +    {
 +        assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
 +
 +        SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +
 +        reader.bf = bf;
 +        reader.ifile = ifile;
 +        reader.dfile = dfile;
 +        reader.indexSummary = isummary;
 +        reader.setup(true);
 +
 +        return reader;
 +    }
 +
 +
 +    private static SSTableReader internalOpen(final Descriptor descriptor,
 +                                            Set<Component> components,
 +                                            CFMetaData metadata,
 +                                            IPartitioner partitioner,
 +                                            Long maxDataAge,
 +                                            StatsMetadata sstableMetadata,
 +                                            OpenReason openReason)
 +    {
 +        Factory readerFactory = descriptor.getFormat().getReaderFactory();
 +
 +        return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +    }
 +
 +    protected SSTableReader(final Descriptor desc,
 +                            Set<Component> components,
 +                            CFMetaData metadata,
 +                            IPartitioner partitioner,
 +                            long maxDataAge,
 +                            StatsMetadata sstableMetadata,
 +                            OpenReason openReason)
 +    {
 +        super(desc, components, metadata, partitioner);
 +        this.sstableMetadata = sstableMetadata;
 +        this.maxDataAge = maxDataAge;
 +        this.openReason = openReason;
 +        this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
 +    }
 +
 +    public static long getTotalBytes(Iterable<SSTableReader> sstables)
 +    {
 +        long sum = 0;
 +        for (SSTableReader sstable : sstables)
 +            sum += sstable.onDiskLength();
 +        return sum;
 +    }
 +
 +    public static long getTotalUncompressedBytes(Iterable<SSTableReader> sstables)
 +    {
 +        long sum = 0;
 +        for (SSTableReader sstable : sstables)
 +            sum += sstable.uncompressedLength();
 +
 +        return sum;
 +    }
 +
 +    public boolean equals(Object that)
 +    {
 +        return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
 +    }
 +
 +    public int hashCode()
 +    {
 +        return this.descriptor.hashCode();
 +    }
 +
 +    public String getFilename()
 +    {
 +        return dfile.path();
 +    }
 +
 +    public void setupKeyCache()
 +    {
 +        // under normal operation we can do this at any time, but SSTR is also used outside C* proper,
 +        // e.g. by BulkLoader, which does not initialize the cache.  As a kludge, we set up the cache
 +        // here when we know we're being wired into the rest of the server infrastructure.
 +        keyCache = CacheService.instance.keyCache;
 +    }
 +
 +    private void load(ValidationMetadata validation) throws IOException
 +    {
 +        if (metadata.getBloomFilterFpChance() == 1.0)
 +        {
 +            // bf is disabled.
 +            load(false, true);
 +            bf = FilterFactory.AlwaysPresent;
 +        }
 +        else if (!components.contains(Component.PRIMARY_INDEX))
 +        {
 +            // avoid any reading of the missing primary index component.
 +            // this should only happen during StandaloneScrubber
 +            load(false, false);
 +        }
 +        else if (!components.contains(Component.FILTER) || validation == null)
 +        {
 +            // bf is enabled, but filter component is missing.
 +            load(true, true);
 +        }
 +        else if (validation.bloomFilterFPChance != metadata.getBloomFilterFpChance())
 +        {
 +            // bf fp chance in sstable metadata and it has changed since compaction.
 +            load(true, true);
 +        }
 +        else
 +        {
 +            // bf is enabled and fp chance matches the currently configured value.
 +            load(false, true);
 +            loadBloomFilter();
 +        }
 +    }
 +
 +    /**
 +     * Load bloom filter from Filter.db file.
 +     *
 +     * @throws IOException
 +     */
 +    private void loadBloomFilter() throws IOException
 +    {
 +        try (DataInputStream stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER)))))
 +        {
 +            bf = FilterFactory.deserialize(stream, true);
 +        }
 +    }
 +
 +    /**
 +     * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter.
 +     * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can
 +     *                             avoid persisting it to disk by setting this to false
 +     */
 +    private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException
 +    {
 +        try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
 +            SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
 +        {
 +            boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
 +            boolean builtSummary = false;
 +            if (recreateBloomFilter || !summaryLoaded)
 +            {
 +                buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
 +                builtSummary = true;
 +            }
 +
 +            if (components.contains(Component.PRIMARY_INDEX))
 +                ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
 +
 +            dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
 +
 +            // Check for an index summary that was downsampled even though the serialization format doesn't support
 +            // that.  If it was downsampled, rebuild it.  See CASSANDRA-8993 for details.
 +        if (!descriptor.version.hasSamplingLevel() && !builtSummary && !validateSummarySamplingLevel() && ifile != null)
 +            {
 +                indexSummary.close();
 +                ifile.close();
 +                dfile.close();
 +
 +                logger.info("Detected erroneously downsampled index summary; will rebuild summary at full sampling");
 +                FileUtils.deleteWithConfirm(new File(descriptor.filenameFor(Component.SUMMARY)));
 +
 +                try(SegmentedFile.Builder ibuilderRebuild = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
 +                    SegmentedFile.Builder dbuilderRebuild = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
 +                {
 +                    buildSummary(false, ibuilderRebuild, dbuilderRebuild, false, Downsampling.BASE_SAMPLING_LEVEL);
 +                    ifile = ibuilderRebuild.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
 +                    dfile = dbuilderRebuild.complete(descriptor.filenameFor(Component.DATA));
 +                    saveSummary(ibuilderRebuild, dbuilderRebuild);
 +                }
 +            }
 +            else if (saveSummaryIfCreated && builtSummary)
 +            {
 +                saveSummary(ibuilder, dbuilder);
 +            }
 +        }
 +        catch (Throwable t)
 +        { // Because the tidier has not been set-up yet in SSTableReader.open(), we must release the files in case of error
 +            if (ifile != null)
 +            {
 +                ifile.close();
 +                ifile = null;
 +            }
 +
 +            if (dfile != null)
 +            {
 +                dfile.close();
 +                dfile = null;
 +            }
 +
 +            if (indexSummary != null)
 +            {
 +                indexSummary.close();
 +                indexSummary = null;
 +            }
 +
 +            throw t;
 +        }
 +    }
 +
 +    /**
 +     * Build index summary(and optionally bloom filter) by reading through Index.db file.
 +     *
 +     * @param recreateBloomFilter true if recreate bloom filter
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @param summaryLoaded true if index summary is already loaded and not need to build again
 +     * @throws IOException
 +     */
 +    private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
 +    {
 +         if (!components.contains(Component.PRIMARY_INDEX))
 +             return;
 +
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        try (RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))))
 +        {
 +            long indexSize = primaryIndex.length();
 +            long histogramCount = sstableMetadata.estimatedRowSize.count();
 +            long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
 +                    ? histogramCount
 +                    : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
 +
 +            if (recreateBloomFilter)
 +                bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
 +
 +            try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel))
 +            {
 +                long indexPosition;
 +                RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata);
 +
 +                while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +                {
 +                    ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
 +                    RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex, descriptor.version);
 +                    DecoratedKey decoratedKey = partitioner.decorateKey(key);
 +                    if (first == null)
 +                        first = decoratedKey;
 +                    last = decoratedKey;
 +
 +                    if (recreateBloomFilter)
 +                        bf.add(decoratedKey);
 +
 +                    // if summary was already read from disk we don't want to re-populate it using primary index
 +                    if (!summaryLoaded)
 +                    {
 +                        summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
 +                        ibuilder.addPotentialBoundary(indexPosition);
 +                        dbuilder.addPotentialBoundary(indexEntry.position);
 +                    }
 +                }
 +
 +                if (!summaryLoaded)
 +                    indexSummary = summaryBuilder.build(partitioner);
 +            }
 +        }
 +
 +        first = getMinimalKey(first);
 +        last = getMinimalKey(last);
 +    }
 +
 +    /**
 +     * Load index summary from Summary.db file if it exists.
 +     *
 +     * if loaded index summary has different index interval from current value stored in schema,
 +     * then Summary.db file will be deleted and this returns false to rebuild summary.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @return true if index summary is loaded successfully from Summary.db file.
 +     */
 +    @SuppressWarnings("resource")
 +    public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (!summariesFile.exists())
 +            return false;
 +
 +        DataInputStream iStream = null;
 +        try
 +        {
 +            iStream = new DataInputStream(new FileInputStream(summariesFile));
 +            indexSummary = IndexSummary.serializer.deserialize(
 +                    iStream, partitioner, descriptor.version.hasSamplingLevel(),
 +                    metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
 +            first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            ibuilder.deserializeBounds(iStream);
 +            dbuilder.deserializeBounds(iStream);
 +        }
 +        catch (IOException e)
 +        {
 +            if (indexSummary != null)
 +                indexSummary.close();
 +            logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
 +            // corrupted; delete it and fall back to creating a new summary
 +            FileUtils.closeQuietly(iStream);
 +            // delete it and fall back to creating a new summary
 +            FileUtils.deleteWithConfirm(summariesFile);
 +            return false;
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(iStream);
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Validates that an index summary has full sampling, as expected when the serialization format does not support
 +     * persisting the sampling level.
 +     * @return true if the summary has full sampling, false otherwise
 +     */
 +    private boolean validateSummarySamplingLevel()
 +    {
 +        // We need to check index summary entries against the index to verify that none of them were dropped due to
 +        // downsampling.  Downsampling can drop any of the first BASE_SAMPLING_LEVEL entries (repeating that drop pattern
 +        // for the remainder of the summary).  Unfortunately, the first entry to be dropped is the entry at
 +        // index (BASE_SAMPLING_LEVEL - 1), so we need to check a full set of BASE_SAMPLING_LEVEL entries.
 +        if (ifile == null)
 +            return false;
 +
 +        Iterator<FileDataInput> segments = ifile.iterator(0);
 +        int i = 0;
 +        int summaryEntriesChecked = 0;
 +        int expectedIndexInterval = getMinIndexInterval();
 +        while (segments.hasNext())
 +        {
 +            String path = null;
 +            try (FileDataInput in = segments.next())
 +            {
 +                path = in.getPath();
 +                while (!in.isEOF())
 +                {
 +                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
 +                    if (i % expectedIndexInterval == 0)
 +                    {
 +                        ByteBuffer summaryKey = ByteBuffer.wrap(indexSummary.getKey(i / expectedIndexInterval));
 +                        if (!summaryKey.equals(indexKey))
 +                            return false;
 +                        summaryEntriesChecked++;
 +
 +                        if (summaryEntriesChecked == Downsampling.BASE_SAMPLING_LEVEL)
 +                            return true;
 +                    }
 +                    RowIndexEntry.Serializer.skip(in);
 +                    i++;
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                markSuspect();
 +                throw new CorruptSSTableException(e, path);
 +            }
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Save index summary to Summary.db file.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     */
 +
 +    public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        saveSummary(this.descriptor, this.first, this.last, ibuilder, dbuilder, indexSummary);
 +    }
 +
 +    private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary newSummary)
 +    {
 +        saveSummary(this.descriptor, this.first, this.last, ibuilder, dbuilder, newSummary);
 +    }
 +    /**
 +     * Save index summary to Summary.db file.
 +     */
 +    public static void saveSummary(Descriptor descriptor, DecoratedKey first, DecoratedKey last,
 +                                   SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (summariesFile.exists())
 +            FileUtils.deleteWithConfirm(summariesFile);
 +
 +        try (DataOutputStreamPlus oStream = new BufferedDataOutputStreamPlus(new FileOutputStream(summariesFile));)
 +        {
 +            IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
 +            ByteBufferUtil.writeWithLength(first.getKey(), oStream);
 +            ByteBufferUtil.writeWithLength(last.getKey(), oStream);
 +            ibuilder.serializeBounds(oStream);
 +            dbuilder.serializeBounds(oStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot save SSTable Summary: ", e);
 +
 +            // corrupted hence delete it and let it load it now.
 +            if (summariesFile.exists())
 +                FileUtils.deleteWithConfirm(summariesFile);
 +        }
 +    }
 +
 +    public void setReplaced()
 +    {
 +        synchronized (tidy.global)
 +        {
 +            assert !tidy.isReplaced;
 +            tidy.isReplaced = true;
 +        }
 +    }
 +
 +    public boolean isReplaced()
 +    {
 +        synchronized (tidy.global)
 +        {
 +            return tidy.isReplaced;
 +        }
 +    }
 +
 +    // runOnClose must NOT be an anonymous or non-static inner class, nor must it retain a reference chain to this reader
 +    public void runOnClose(final Runnable runOnClose)
 +    {
 +        synchronized (tidy.global)
 +        {
 +            final Runnable existing = tidy.runOnClose;
 +            tidy.runOnClose = AndThen.get(existing, runOnClose);
 +        }
 +    }
 +
 +    private static class AndThen implements Runnable
 +    {
 +        final Runnable runFirst;
 +        final Runnable runSecond;
 +
 +        private AndThen(Runnable runFirst, Runnable runSecond)
 +        {
 +            this.runFirst = runFirst;
 +            this.runSecond = runSecond;
 +        }
 +
 +        public void run()
 +        {
 +            runFirst.run();
 +            runSecond.run();
 +        }
 +
 +        static Runnable get(Runnable runFirst, Runnable runSecond)
 +        {
 +            if (runFirst == null)
 +                return runSecond;
 +            return new AndThen(runFirst, runSecond);
 +        }
 +    }
 +
 +    /**
 +     * Clone this reader with the provided start and open reason, and set the clone as replacement.
 +     *
 +     * @param newFirst the first key for the replacement (which can be different from the original due to the pre-emptive
 +     * opening of compaction results).
 +     * @param reason the {@code OpenReason} for the replacement.
 +     *
 +     * @return the cloned reader. That reader is set as a replacement by the method.
 +     */
 +    private SSTableReader cloneAndReplace(DecoratedKey newFirst, OpenReason reason)
 +    {
 +        return cloneAndReplace(newFirst, reason, indexSummary.sharedCopy());
 +    }
 +
 +    /**
 +     * Clone this reader with the new values and set the clone as replacement.
 +     *
 +     * @param newFirst the first key for the replacement (which can be different from the original due to the pre-emptive
 +     * opening of compaction results).
 +     * @param reason the {@code OpenReason} for the replacement.
 +     * @param newSummary the index summary for the replacement.
 +     *
 +     * @return the cloned reader. That reader is set as a replacement by the method.
 +     */
 +    private SSTableReader cloneAndReplace(DecoratedKey newFirst, OpenReason reason, IndexSummary newSummary)
 +    {
 +        SSTableReader replacement = internalOpen(descriptor,
 +                                                 components,
 +                                                 metadata,
 +                                                 partitioner,
 +                                                 ifile != null ? ifile.sharedCopy() : null,
 +                                                 dfile.sharedCopy(),
 +                                                 newSummary,
 +                                                 bf.sharedCopy(),
 +                                                 maxDataAge,
 +                                                 sstableMetadata,
 +                                                 reason);
 +        replacement.first = newFirst;
 +        replacement.last = last;
 +        replacement.isSuspect.set(isSuspect.get());
 +        return replacement;
 +    }
 +
 +    // runOnClose must NOT be an anonymous or non-static inner class, nor must it retain a reference chain to this reader
 +    public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
 +    {
 +        synchronized (tidy.global)
 +        {
 +            assert openReason != OpenReason.EARLY;
 +            // TODO: merge with caller's firstKeyBeyond() work,to save time
 +            if (newStart.compareTo(first) > 0)
 +            {
 +                final long dataStart = getPosition(newStart, Operator.EQ).position;
 +                final long indexStart = getIndexScanPosition(newStart);
 +                this.tidy.runOnClose = new DropPageCache(dfile, dataStart, ifile, indexStart, runOnClose);
 +            }
 +
 +            return cloneAndReplace(newStart, OpenReason.MOVED_START);
 +        }
 +    }
 +
 +    private static class DropPageCache implements Runnable
 +    {
 +        final SegmentedFile dfile;
 +        final long dfilePosition;
 +        final SegmentedFile ifile;
 +        final long ifilePosition;
 +        final Runnable andThen;
 +
 +        private DropPageCache(SegmentedFile dfile, long dfilePosition, SegmentedFile ifile, long ifilePosition, Runnable andThen)
 +        {
 +            this.dfile = dfile;
 +            this.dfilePosition = dfilePosition;
 +            this.ifile = ifile;
 +            this.ifilePosition = ifilePosition;
 +            this.andThen = andThen;
 +        }
 +
 +        public void run()
 +        {
 +            dfile.dropPageCache(dfilePosition);
 +
 +            if (ifile != null)
 +                ifile.dropPageCache(ifilePosition);
 +            andThen.run();
 +        }
 +    }
 +
 +    /**
 +     * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
 +     * be built at the target samplingLevel.  This (original) SSTableReader instance will be marked as replaced, have
 +     * its DeletingTask removed, and have its periodic read-meter sync task cancelled.
 +     * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader
 +     * @return a new SSTableReader
 +     * @throws IOException
 +     */
 +    @SuppressWarnings("resource")
 +    public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
 +    {
 +        assert descriptor.version.hasSamplingLevel();
 +
 +        synchronized (tidy.global)
 +        {
 +            assert openReason != OpenReason.EARLY;
 +
 +            int minIndexInterval = metadata.getMinIndexInterval();
 +            int maxIndexInterval = metadata.getMaxIndexInterval();
 +            double effectiveInterval = indexSummary.getEffectiveIndexInterval();
 +
 +            IndexSummary newSummary;
 +            long oldSize = bytesOnDisk();
 +
 +            // We have to rebuild the summary from the on-disk primary index in three cases:
 +            // 1. The sampling level went up, so we need to read more entries off disk
 +            // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
 +            //    at full sampling (and consequently at any other sampling level)
 +            // 3. The max_index_interval was lowered, forcing us to raise the sampling level
 +            if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
 +            {
 +                newSummary = buildSummaryAtLevel(samplingLevel);
 +            }
 +            else if (samplingLevel < indexSummary.getSamplingLevel())
 +            {
 +                // we can use the existing index summary to make a smaller one
 +                newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
 +
 +                try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
 +                    SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
 +                {
 +                    saveSummary(ibuilder, dbuilder, newSummary);
 +                }
 +            }
 +            else
 +            {
 +                throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
 +                        "no adjustments to min/max_index_interval");
 +            }
 +
 +            long newSize = bytesOnDisk();
 +            StorageMetrics.load.inc(newSize - oldSize);
 +            parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
 +
 +            return cloneAndReplace(first, OpenReason.METADATA_CHANGE, newSummary);
 +        }
 +    }
 +
 +    private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
 +            try (IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel))
 +            {
 +                long indexPosition;
 +                while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +                {
 +                    summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
 +                    RowIndexEntry.Serializer.skip(primaryIndex);
 +                }
 +
 +                return summaryBuilder.build(partitioner);
 +            }
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +    }
 +
 +    public RestorableMeter getReadMeter()
 +    {
 +        return readMeter;
 +    }
 +
 +    public int getIndexSummarySamplingLevel()
 +    {
 +        return indexSummary.getSamplingLevel();
 +    }
 +
 +    public long getIndexSummaryOffHeapSize()
 +    {
 +        return indexSummary.getOffHeapSize();
 +    }
 +
 +    public int getMinIndexInterval()
 +    {
 +        return indexSummary.getMinIndexInterval();
 +    }
 +
 +    public double getEffectiveIndexInterval()
 +    {
 +        return indexSummary.getEffectiveIndexInterval();
 +    }
 +
 +    public void releaseSummary()
 +    {
 +        tidy.releaseSummary();
 +        indexSummary = null;
 +    }
 +
 +    private void validate()
 +    {
 +        if (this.first.compareTo(this.last) > 0)
 +        {
 +            selfRef().release();
 +            throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
 +        }
 +    }
 +
 +    /**
 +     * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away,
 +     * modulo downsampling of the index summary). Always returns a value >= 0
 +     */
 +    public long getIndexScanPosition(RowPosition key)
 +    {
 +        if (openReason == OpenReason.MOVED_START && key.compareTo(first) < 0)
 +            key = first;
 +
 +        return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
 +    }
 +
 +    @VisibleForTesting
 +    public static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
 +    {
 +        if (binarySearchResult == -1)
 +            return 0;
 +        else
 +            return referencedIndexSummary.getPosition(getIndexSummaryIndexFromBinarySearchResult(binarySearchResult));
 +    }
 +
 +    public static int getIndexSummaryIndexFromBinarySearchResult(int binarySearchResult)
 +    {
 +        if (binarySearchResult < 0)
 +        {
 +            // binary search gives us the first index _greater_ than the key searched for,
 +            // i.e., its insertion position
 +            int greaterThan = (binarySearchResult + 1) * -1;
 +            if (greaterThan == 0)
 +                return -1;
 +            return greaterThan - 1;
 +        }
 +        else
 +        {
 +            return binarySearchResult;
 +        }
 +    }
 +
 +    /**
 +     * Returns the compression metadata for this sstable.
 +     * @throws IllegalStateException if the sstable is not compressed
 +     */
 +    public CompressionMetadata getCompressionMetadata()
 +    {
 +        if (!compression)
 +            throw new IllegalStateException(this + " is not compressed");
 +
 +        CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata();
 +
 +        //We need the parent cf metadata
 +        String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
 +        cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName));
 +
 +        return cmd;
 +    }
 +
 +    /**
 +     * Returns the amount of memory in bytes used off heap by the compression meta-data.
 +     * @return the amount of memory in bytes used off heap by the compression meta-data
 +     */
 +    public long getCompressionMetadataOffHeapSize()
 +    {
 +        if (!compression)
 +            return 0;
 +
 +        return getCompressionMetadata().offHeapSize();
 +    }
 +
 +    /**
 +     * For testing purposes only.
 +     */
 +    public void forceFilterFailures()
 +    {
 +        bf = FilterFactory.AlwaysPresent;
 +    }
 +
 +    public IFilter getBloomFilter()
 +    {
 +        return bf;
 +    }
 +
 +    public long getBloomFilterSerializedSize()
 +    {
 +        return bf.serializedSize();
 +    }
 +
 +    /**
 +     * Returns the amount of memory in bytes used off heap by the bloom filter.
 +     * @return the amount of memory in bytes used off heap by the bloom filter
 +     */
 +    public long getBloomFilterOffHeapSize()
 +    {
 +        return bf.offHeapSize();
 +    }
 +
 +    /**
 +     * @return An estimate of the number of keys in this SSTable based on the index summary.
 +     */
 +    public long estimatedKeys()
 +    {
 +        return indexSummary.getEstimatedKeyCount();
 +    }
 +
 +    /**
 +     * @param ranges
 +     * @return An estimate of the number of keys for given ranges in this SSTable.
 +     */
 +    public long estimatedKeysForRanges(Collection<Range<Token>> ranges)
 +    {
 +        long sampleKeyCount = 0;
 +        List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges);
 +        for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
 +            sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1);
 +
 +        // adjust for the current sampling level: (BSL / SL) * index_interval_at_full_sampling
 +        long estimatedKeys = sampleKeyCount * ((long) Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getMinIndexInterval()) / indexSummary.getSamplingLevel();
 +        return Math.max(1, estimatedKeys);
 +    }
 +
 +    /**
 +     * Returns the number of entries in the IndexSummary.  At full sampling, this is approximately 1/INDEX_INTERVALth of
 +     * the keys in this SSTable.
 +     */
 +    public int getIndexSummarySize()
 +    {
 +        return indexSummary.size();
 +    }
 +
 +    /**
 +     * Returns the approximate number of entries the IndexSummary would contain if it were at full sampling.
 +     */
 +    public int getMaxIndexSummarySize()
 +    {
 +        return indexSummary.getMaxNumberOfEntries();
 +    }
 +
 +    /**
 +     * Returns the key for the index summary entry at `index`.
 +     */
 +    public byte[] getIndexSummaryKey(int index)
 +    {
 +        return indexSummary.getKey(index);
 +    }
 +
 +    private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Integer,Integer>> positions = new ArrayList<>();
 +
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            RowPosition leftPosition = range.left.maxKeyBound();
 +            RowPosition rightPosition = range.right.maxKeyBound();
 +
 +            int left = summary.binarySearch(leftPosition);
 +            if (left < 0)
 +                left = (left + 1) * -1;
 +            else
 +                // left range are start exclusive
 +                left = left + 1;
 +            if (left == summary.size())
 +                // left is past the end of the sampling
 +                continue;
 +
 +            int right = Range.isWrapAround(range.left, range.right)
 +                    ? summary.size() - 1
 +                    : summary.binarySearch(rightPosition);
 +            if (right < 0)
 +            {
 +                // range are end inclusive so we use the previous index from what binarySearch give us
 +                // since that will be the last index we will return
 +                right = (right + 1) * -1;
 +                if (right == 0)
 +                    // Means the first key is already stricly greater that the right bound
 +                    continue;
 +                right--;
 +            }
 +
 +            if (left > right)
 +                // empty range
 +                continue;
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public Iterable<DecoratedKey> getKeySamples(final Range<Token> range)
 +    {
 +        final List<Pair<Integer, Integer>> indexRanges = getSampleIndexesForRanges(indexSummary, Collections.singletonList(range));
 +
 +        if (indexRanges.isEmpty())
 +            return Collections.emptyList();
 +
 +        return new Iterable<DecoratedKey>()
 +        {
 +            public Iterator<DecoratedKey> iterator()
 +            {
 +                return new Iterator<DecoratedKey>()
 +                {
 +                    private Iterator<Pair<Integer, Integer>> rangeIter = indexRanges.iterator();
 +                    private Pair<Integer, Integer> current;
 +                    private int idx;
 +
 +                    public boolean hasNext()
 +                    {
 +                        if (current == null || idx > current.right)
 +                        {
 +                            if (rangeIter.hasNext())
 +                            {
 +                                current = rangeIter.next();
 +                                idx = current.left;
 +                                return true;
 +                            }
 +                            return false;
 +                        }
 +
 +                        return true;
 +                    }
 +
 +                    public DecoratedKey next()
 +                    {
 +                        byte[] bytes = indexSummary.getKey(idx++);
 +                        return partitioner.decorateKey(ByteBuffer.wrap(bytes));
 +                    }
 +
 +                    public void remove()
 +                    {
 +                        throw new UnsupportedOperationException();
 +                    }
 +                };
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges.
 +     * @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable.
 +     */
 +    public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Long,Long>> positions = new ArrayList<>();
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            assert !range.isWrapAround() || range.right.isMinimum();
 +            // truncate the range so it at most covers the sstable
 +            AbstractBounds<RowPosition> bounds = Range.makeRowRange(range);
 +            RowPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound();
 +            RowPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right;
 +
 +            if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) < 0)
 +                continue;
 +
 +            long left = getPosition(leftBound, Operator.GT).position;
 +            long right = (rightBound.compareTo(last) > 0)
 +                         ? uncompressedLength()
 +                         : getPosition(rightBound, Operator.GT).position;
 +
 +            if (left == right)
 +                // empty range
 +                continue;
 +
 +            assert left < right : String.format("Range=%s openReason=%s first=%s last=%s left=%d right=%d", range, openReason, first, last, left, right);
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public KeyCacheKey getCacheKey(DecoratedKey key)
 +    {
-         return new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
++        return new KeyCacheKey(metadata.ksAndCFName, descriptor, key.getKey());
 +    }
 +
 +    public void cacheKey(DecoratedKey key, RowIndexEntry info)
 +    {
 +        CachingOptions caching = metadata.getCaching();
 +
 +        if (!caching.keyCache.isEnabled()
 +                || keyCache == null
 +                || keyCache.getCapacity() == 0)
 +        {
 +            return;
 +        }
 +
-         KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
++        KeyCacheKey cacheKey = new KeyCacheKey(metadata.ksAndCFName, descriptor, key.getKey());
 +        logger.trace("Adding cache entry for {} -> {}", cacheKey, info);
 +        keyCache.put(cacheKey, info);
 +    }
 +
 +    public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
 +    {
-         return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.getKey()), updateStats);
++        return getCachedPosition(new KeyCacheKey(metadata.ksAndCFName, descriptor, key.getKey()), updateStats);
 +    }
 +
 +    protected RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats)
 +    {
 +        if (keyCache != null && keyCache.getCapacity() > 0 && metadata.getCaching().keyCache.isEnabled()) {
 +            if (updateStats)
 +            {
 +                RowIndexEntry cachedEntry = keyCache.get(unifiedKey);
 +                keyCacheRequest.incrementAndGet();
 +                if (cachedEntry != null)
 +                {
 +                    keyCacheHit.incrementAndGet();
 +                    bloomFilterTracker.addTruePositive();
 +                }
 +                return cachedEntry;
 +            }
 +            else
 +            {
 +                return keyCache.getInternal(unifiedKey);
 +            }
 +        }
 +        return null;
 +    }
 +
 +    /**
 +     * Get position updating key cache and stats.
 +     * @see #getPosition(org.apache.cassandra.db.RowPosition, SSTableReader.Operator, boolean)
 +     */
 +    public RowIndexEntry getPosition(RowPosition key, Operator op)
 +    {
 +        return getPosition(key, op, true, false);
 +    }
 +
 +    public RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats)
 +    {
 +        return getPosition(key, op, updateCacheAndStats, false);
 +    }
 +    /**
 +     * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
 +     * allow key selection by token bounds but only if op != * EQ
 +     * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
 +     * @param updateCacheAndStats true if updating stats and cache
 +     * @return The index entry corresponding to the key, or null if the key is not present
 +     */
 +    protected abstract RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast);
 +
 +    //Corresponds to a name column
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry);
 +
 +    //Corresponds to a slice query
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry);
 +
 +    /**
 +     * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
 +     */
 +    public DecoratedKey firstKeyBeyond(RowPosition token)
 +    {
 +        if (token.compareTo(first) < 0)
 +            return first;
 +
 +        long sampledPosition = getIndexScanPosition(token);
 +
 +        if (ifile == null)
 +            return null;
 +
 +        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
 +        while (segments.hasNext())
 +        {
 +            String path = null;
 +            try (FileDataInput in = segments.next();)
 +            {
 +                path = in.getPath();
 +                while (!in.isEOF())
 +                {
 +                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
 +                    DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
 +                    if (indexDecoratedKey.compareTo(token) > 0)
 +                        return indexDecoratedKey;
 +
 +                    RowIndexEntry.Serializer.skip(in);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                markSuspect();
 +                throw new CorruptSSTableException(e, path);
 +            }
 +        }
 +
 +        return null;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the data for this SSTable. For
 +     * compressed files, this is not the same thing as the on disk size (see
 +     * onDiskLength())
 +     */
 +    public long uncompressedLength()
 +    {
 +        return dfile.length;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the on disk size for this SSTable. For
 +     * compressed files, this is not the same thing as the data length (see
 +     * length())
 +     */
 +    public long onDiskLength()
 +    {
 +        return dfile.onDiskLength;
 +    }
 +
 +    /**
 +     * Mark the sstable as obsolete, i.e., compacted into newer sstables.
 +     *
 +     * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere
 +     * except for threads holding a reference.
 +     *
 +     * @return true if the this is the first time the file was marked obsolete.  Calling this
 +     * multiple times is usually buggy (see exceptions in Tracker.unmarkCompacting and removeOldSSTablesSize).
 +     */
 +    public boolean markObsolete(Tracker tracker)
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} compacted", getFilename());
 +
 +        synchronized (tidy.global)
 +        {
 +            assert !tidy.isReplaced;
 +        }
 +        if (!tidy.global.isCompacted.getAndSet(true))
 +        {
 +            tidy.type.markObsolete(this, tracker);
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    public boolean isMarkedCompacted()
 +    {
 +        return tidy.global.isCompacted.get();
 +    }
 +
 +    public void markSuspect()
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} as a suspect for blacklisting.", getFilename());
 +
 +        isSuspect.getAndSet(true);
 +    }
 +
 +    public boolean isMarkedSuspect()
 +    {
 +        return isSuspect.get();
 +    }
 +
 +
 +    /**
 +     * I/O SSTableScanner
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner()
 +    {
 +        return getScanner((RateLimiter) null);
 +    }
 +
 +    public ISSTableScanner getScanner(RateLimiter limiter)
 +    {
 +        return getScanner(DataRange.allData(partitioner), limiter);
 +    }
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner(DataRange dataRange)
 +    {
 +        return getScanner(dataRange, null);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined range of tokens.
 +     *
 +     * @param range the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner(Range<Token> range, RateLimiter limiter)
 +    {
 +        if (range == null)
 +            return getScanner(limiter);
 +        return getScanner(Collections.singletonList(range), limiter);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
 +     *
 +     * @param ranges the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public abstract ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter);
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public abstract ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter);
 +
 +
 +
 +    public FileDataInput getFileDataInput(long position)
 +    {
 +        return dfile.getSegment(position);
 +    }
 +
 +    /**
 +     * Tests if the sstable contains data newer than the given age param (in localhost currentMilli time).
 +     * This works in conjunction with maxDataAge which is an upper bound on the create of data in this sstable.
 +     * @param age The age to compare the maxDataAre of this sstable. Measured in millisec since epoc on this host
 +     * @return True iff this sstable contains data that's newer than the given age parameter.
 +     */
 +    public boolean newSince(long age)
 +    {
 +        return maxDataAge > age;
 +    }
 +
 +    public void createLinks(String snapshotDirectoryPath)
 +    {
 +        for (Component component : components)
 +        {
 +            File sourceFile = new File(descriptor.filenameFor(component));
 +            File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
 +            FileUtils.createHardLink(sourceFile, targetLink);
 +        }
 +    }
 +
 +    public boolean isRepaired()
 +    {
 +        return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
 +    }
 +
 +    /**
 +     * TODO: Move someplace reusable
 +     */
 +    public abstract static class Operator
 +    {
 +        public static final Operator EQ = new Equals();
 +        public static final Operator GE = new GreaterThanOrEqualTo();
 +        public static final Operator GT = new GreaterThan();
 +
 +        /**
 +         * @param comparison The result of a call to compare/compareTo, with the desired field on the rhs.
 +         * @return less than 0 if the operator cannot match forward, 0 if it matches, greater than 0 if it might match forward.
 +         */
 +        public abstract int apply(int comparison);
 +
 +        final static class Equals extends Operator
 +        {
 +            public int apply(int comparison) { return -comparison; }
 +        }
 +
 +        final static class GreaterThanOrEqualTo extends Operator
 +        {
 +            public int apply(int comparison) { return comparison >= 0 ? 0 : 1; }
 +        }
 +
 +        final static class GreaterThan extends Operator
 +        {
 +            public int apply(int comparison) { return comparison > 0 ? 0 : 1; }
 +        }
 +    }
 +
 +    public long getBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getFalsePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentFalsePositiveCount();
 +    }
 +
 +    public long getBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getTruePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentTruePositiveCount();
 +    }
 +
 +    public InstrumentingCache<KeyCacheKey, RowIndexEntry> getKeyCache()
 +    {
 +        return keyCache;
 +    }
 +
 +    public EstimatedHistogram getEstimatedRowSize()
 +    {
 +        return sstableMetadata.estimatedRowSize;
 +    }
 +
 +    public EstimatedHistogram getEstimatedColumnCount()
 +    {
 +        return sstableMetadata.estimatedColumnCount;
 +    }
 +
 +    public double getEstimatedDroppableTombstoneRatio(int gcBefore)
 +    {
 +        return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore);
 +    }
 +
 +    public double getDroppableTombstonesBefore(int gcBefore)
 +    {
 +        return sstableMetadata.getDroppableTombstonesBefore(gcBefore);
 +    }
 +
 +    public double getCompressionRatio()
 +    {
 +        return sstableMetadata.compressionRatio;
 +    }
 +
 +    public ReplayPosition getReplayPosition()
 +    {
 +        return sstableMetadata.replayPosition;
 +    }
 +
 +    public long getMinTimestamp()
 +    {
 +        return sstableMetadata.minTimestamp;
 +    }
 +
 +    public long getMaxTimestamp()
 +    {
 +        return sstableMetadata.maxTimestamp;
 +    }
 +
 +    public Set<Integer> getAncestors()
 +    {
 +        try
 +        {
 +            CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
 +            if (compactionMetadata != null)
 +                return compactionMetadata.ancestors;
 +            return Collections.emptySet();
 +        }
 +        catch (IOException e)
 +        {
 +            SSTableReader.logOpenException(descriptor, e);
 +            return Collections.emptySet();
 +        }
 +    }
 +
 +    public int getSSTableLevel()
 +    {
 +        return sstableMetadata.sstableLevel;
 +    }
 +
 +    /**
 +     * Reloads the sstable metadata from disk.
 +     *
 +     * Called after level is changed on sstable, for example if the sstable is dropped to L0
 +     *
 +     * Might be possible to remove in future versions
 +     *
 +     * @throws IOException
 +     */
 +    public void reloadSSTableMetadata() throws IOException
 +    {
 +        this.sstableMetadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS);
 +    }
 +
 +    public StatsMetadata getSSTableMetadata()
 +    {
 +        return sstableMetadata;
 +    }
 +
 +    public RandomAccessReader openDataReader(RateLimiter limiter)
 +    {
 +        assert limiter != null;
 +        return dfile.createThrottledReader(limiter);
 +    }
 +
 +    public RandomAccessReader openDataReader()
 +    {
 +        return dfile.createReader();
 +    }
 +
 +    public RandomAccessReader openIndexReader()
 +    {
 +        if (ifile != null)
 +            return ifile.createReader();
 +        return null;
 +    }
 +
 +    /**
 +     * @param component component to get timestamp.
 +     * @return last modified time for given component. 0 if given component does not exist or IO error occurs.
 +     */
 +    public long getCreationTimeFor(Component component)
 +    {
 +        return new File(descriptor.filenameFor(component)).lastModified();
 +    }
 +
 +    /**
 +     * @return Number of key cache hit
 +     */
 +    public long getKeyCacheHit()
 +    {
 +        return keyCacheHit.get();
 +    }
 +
 +    /**
 +     * @return Number of key cache request
 +     */
 +    public long getKeyCacheRequest()
 +    {
 +        return keyCacheRequest.get();
 +    }
 +
 +    /**
 +     * Increment the total row read count and read rate for this SSTable.  This should not be incremented for range
 +     * slice queries, row cache hits, or non-query reads, like compaction.
 +     */
 +    public void incrementReadCount()
 +    {
 +        if (readMeter != null)
 +            readMeter.mark();
 +    }
 +
 +    public static class SizeComparator implements Comparator<SSTableReader>
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
 +        }
 +    }
 +
 +    public Ref<SSTableReader> tryRef()
 +    {
 +        return selfRef.tryRef();
 +    }
 +
 +    public Ref<SSTableReader> selfRef()
 +    {
 +        return selfRef;
 +    }
 +
 +    public Ref<SSTableReader> ref()
 +    {
 +        return selfRef.ref();
 +    }
 +
 +    void setup(boolean trackHotness)
 +    {
 +        tidy.setup(this, trackHotness);
 +        this.readMeter = tidy.global.readMeter;
 +    }
 +
 +    @VisibleForTesting
 +    public void overrideReadMeter(RestorableMeter readMeter)
 +    {
 +        this.readMeter = tidy.global.readMeter = readMeter;
 +    }
 +
 +    /**
 +     * One instance per SSTableReader we create. This references the type-shared tidy, which in turn references
 +     * the globally shared tidy, i.e.
 +     *
 +     * InstanceTidier => DescriptorTypeTitdy => GlobalTidy
 +     *
 +     * We can create many InstanceTidiers (one for every time we reopen an sstable with MOVED_START for example), but there can only be
 +     * two DescriptorTypeTidy (FINAL and TEMPLINK) and only one GlobalTidy for one single logical sstable.
 +     *
 +     * When the InstanceTidier cleansup, it releases its reference to its DescriptorTypeTidy; when all InstanceTidiers
 +     * for that type have run, the DescriptorTypeTidy cleansup. DescriptorTypeTidy behaves in the same way towards GlobalTidy.
 +     *
 +     * For ease, we stash a direct reference to both our type-shared and global tidier
 +     */
 +    private static final class InstanceTidier implements Tidy
 +    {
 +        private final Descriptor descriptor;
 +        private final CFMetaData metadata;
 +        private IFilter bf;
 +        private IndexSummary summary;
 +
 +        private SegmentedFile dfile;
 +        private SegmentedFile ifile;
 +        private Runnable runOnClose;
 +        private boolean isReplaced = false;
 +
 +        // a reference to our shared per-Descriptor.Type tidy instance, that
 +        // we will release when we are ourselves released
 +        private Ref<DescriptorTypeTidy> typeRef;
 +
 +        // a convenience stashing of the shared per-descriptor-type tidy instance itself
 +        // and the per-logical-sstable globally shared state that it is linked to
 +        private DescriptorTypeTidy type;
 +        private GlobalTidy global;
 +
 +        private boolean setup;
 +
 +        void setup(SSTableReader reader, boolean trackHotness)
 +        {
 +            this.setup = true;
 +            this.bf = reader.bf;
 +            this.summary = reader.indexSummary;
 +            this.dfile = reader.dfile;
 +            this.ifile = reader.ifile;
 +            // get a new reference to the shared descriptor-type tidy
 +            this.typeRef = DescriptorTypeTidy.get(reader);
 +            this.type = typeRef.get();
 +            this.global = type.globalRef.get();
 +            if (trackHotness)
 +                global.ensureReadMeter();
 +        }
 +
 +        InstanceTidier(Descriptor descriptor, CFMetaData metadata)
 +        {
 +            this.descriptor = descriptor;
 +            this.metadata = metadata;
 +        }
 +
 +        public void tidy()
 +        {
 +            // don't try to cleanup if the sstablereader was never fully constructed
 +            if (!setup)
 +                return;
 +
 +            final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
 +            final OpOrder.Barrier barrier;
 +            if (cfs != null)
 +            {
 +                barrier = cfs.readOrdering.newBarrier();
 +                barrier.issue();
 +            }
 +            else
 +                barrier = null;
 +
 +            ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
 +            {
 +                public void run()
 +                {
 +                    if (barrier != null)
 +                        barrier.await();
 +                    if (bf != null)
 +                        bf.close();
 +                    if (summary != null)
 +                        summary.close();
 +                    if (runOnClose != null)
 +                        runOnClose.run();
 +                    if (dfile != null)
 +                        dfile.close();
 +                    if (ifile != null)
 +                        ifile.close();
 +                    typeRef.release();
 +                }
 +            });
 +        }
 +
 +        public String name()
 +        {
 +            return descriptor.toString();
 +        }
 +
 +        void releaseSummary()
 +        {
 +            summary.close();
 +            assert summary.isCleanedUp();
 +            summary = null;
 +        }
 +    }
 +
 +    /**
 +     * One shared between all instances of a given Descriptor.Type.
 +     * Performs only two things: the deletion of the sstables for the type,
 +     * if necessary; and the shared reference to the globally shared state.
 +     *
 +     * All InstanceTidiers, on setup(), ask the static get() method for their shared state,
 +     * and stash a reference to it to be released when they are. Once all such references are
 +     * released, the shared tidy will be performed.
 +     */
 +    static final class DescriptorTypeTidy implements Tidy
 +    {
 +        // keyed by REAL descriptor (TMPLINK/FINAL), mapping to the shared DescriptorTypeTidy for that descriptor
 +        static final ConcurrentMap<Descriptor, Ref<DescriptorTypeTidy>> lookup = new ConcurrentHashMap<>();
 +
 +        private final Descriptor desc;
 +        private final Ref<GlobalTidy> globalRef;
 +        private final Set<Component> components;
 +        private long sizeOnDelete;
 +        private Counter totalDiskSpaceUsed;
 +
 +        DescriptorTypeTidy(Descriptor desc, SSTableReader sstable)
 +        {
 +            this.desc = desc;
 +            // get a new reference to the shared global tidy
 +            this.globalRef = GlobalTidy.get(sstable);
 +            this.components = sstable.components;
 +        }
 +
 +        void markObsolete(SSTableReader instance, Tracker tracker)
 +        {
 +            // the tracker is used only to notify listeners of deletion of the sstable;
 +            // since deletion of a non-final file is not really deletion of the sstable,
 +            // we don't want to notify the listeners in this event
 +            if (tracker != null && tracker.cfstore != null && desc.type == Descriptor.Type.FINAL)
 +            {
 +                sizeOnDelete = instance.bytesOnDisk();
 +                totalDiskSpaceUsed = tracker.cfstore.metric.totalDiskSpaceUsed;
 +                tracker.notifyDeleting(instance);
 +            }
 +        }
 +
 +        public void tidy()
 +        {
 +            lookup.remove(desc);
 +            boolean isCompacted = globalRef.get().isCompacted.get();
 +            globalRef.release();
 +            switch (desc.type)
 +            {
 +                case FINAL:
 +                    if (isCompacted)
 +                        new SSTableDeletingTask(desc, components, totalDiskSpaceUsed, sizeOnDelete).run();
 +                    break;
 +                case TEMPLINK:
 +                    new SSTableDeletingTask(desc, components, null, 0).run();
 +                    break;
 +                default:
 +                    throw new IllegalStateException();
 +            }
 +        }
 +
 +        public String name()
 +        {
 +            return desc.toString();
 +        }
 +
 +        // get a new reference to the shared DescriptorTypeTidy for this sstable
 +        @SuppressWarnings("resource")
 +        public static Ref<DescriptorTypeTidy> get(SSTableReader sstable)
 +        {
 +            Descriptor desc = sstable.descriptor;
 +            if (sstable.openReason == OpenReason.EARLY)
 +                desc = desc.asType(Descriptor.Type.TEMPLINK);
 +            Ref<DescriptorTypeTidy> refc = lookup.get(desc);
 +            if (refc != null)
 +                return refc.ref();
 +            final DescriptorTypeTidy tidy = new DescriptorTypeTidy(desc, sstable);
 +            refc = new Ref<>(tidy, tidy);
 +            Ref<?> ex = lookup.put

<TRUNCATED>

[7/7] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by sn...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e63dacf7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e63dacf7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e63dacf7

Branch: refs/heads/cassandra-2.2
Commit: e63dacf793fedc8a9eed9c7fc635cde5f9fd68f3
Parents: 8b2dc1f e889ee4
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Sep 16 22:00:25 2015 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Sep 16 22:00:25 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cache/AutoSavingCache.java | 193 ++++++++------
 .../org/apache/cassandra/cache/CacheKey.java    |  14 +-
 .../apache/cassandra/cache/CounterCacheKey.java |  26 +-
 .../org/apache/cassandra/cache/KeyCacheKey.java |  19 +-
 .../org/apache/cassandra/cache/OHCProvider.java |  17 +-
 .../org/apache/cassandra/cache/RowCacheKey.java |  34 +--
 .../org/apache/cassandra/config/CFMetaData.java |   9 +
 .../cassandra/config/DatabaseDescriptor.java    |  19 +-
 .../org/apache/cassandra/config/Schema.java     |  56 +++-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  75 ++----
 src/java/org/apache/cassandra/db/Keyspace.java  |   4 -
 .../org/apache/cassandra/db/RowIndexEntry.java  |   2 +-
 .../db/index/SecondaryIndexManager.java         |  30 +--
 .../io/sstable/format/SSTableReader.java        |  10 +-
 .../io/sstable/format/big/BigTableReader.java   |   2 +-
 .../apache/cassandra/service/CacheService.java  |  58 ++--
 .../cassandra/service/CassandraDaemon.java      |  41 ++-
 .../cassandra/service/StorageService.java       |  31 ++-
 .../org/apache/cassandra/utils/FBUtilities.java |  16 ++
 .../cassandra/cache/AutoSavingCacheTest.java    |   5 +-
 .../cassandra/cache/CacheProviderTest.java      |  17 +-
 .../apache/cassandra/cql3/KeyCacheCqlTest.java  | 266 +++++++++++++++++++
 .../apache/cassandra/db/CounterCacheTest.java   |  70 ++++-
 .../org/apache/cassandra/db/KeyCacheTest.java   |   2 +-
 .../org/apache/cassandra/db/RowCacheTest.java   |  41 ++-
 26 files changed, 760 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7deebcf,207f16a..96ec0fa
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,14 -1,5 +1,15 @@@
 -2.1.10
 +2.2.2
 + * Defer default role manager setup until all nodes are on 2.2+ (CASSANDRA-9761)
 + * Cancel transaction for sstables we wont redistribute index summary
 +   for (CASSANDRA-10270)
 + * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209) 
 + * Retry snapshot deletion after compaction and gc on Windows (CASSANDRA-10222)
 + * Fix failure to start with space in directory path on Windows (CASSANDRA-10239)
 + * Fix repair hang when snapshot failed (CASSANDRA-10057)
 + * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks
 +   (CASSANDRA-10199)
 +Merged from 2.1:
+  * Fix cache handling of 2i and base tables (CASSANDRA-10155)
   * Fix NPE in nodetool compactionhistory (CASSANDRA-9758)
   * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410)
   * BATCH statement is broken in cqlsh (CASSANDRA-10272)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cache/AutoSavingCache.java
index f0f4e8a,3ebbc76..3ec9d4e
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@@ -61,8 -65,16 +67,16 @@@ public class AutoSavingCache<K extends 
      protected volatile ScheduledFuture<?> saveTask;
      protected final CacheService.CacheType cacheType;
  
 -    private CacheSerializer<K, V> cacheLoader;
 +    private final CacheSerializer<K, V> cacheLoader;
-     private static final String CURRENT_VERSION = "c";
+ 
+     /*
+      * CASSANDRA-10155 required a format change to fix 2i indexes and caching.
+      * 2.2 is already at version "c" and 3.0 is at "d".
+      *
+      * Since cache versions match exactly and there is no partial fallback just add
+      * a minor version letter.
+      */
 -    private static final String CURRENT_VERSION = "ba";
++    private static final String CURRENT_VERSION = "ca";
  
      private static volatile IStreamFactory streamFactory = new IStreamFactory()
      {
@@@ -90,16 -102,9 +104,14 @@@
          this.cacheLoader = cacheloader;
      }
  
-     public File getCacheDataPath(UUID cfId, String version)
 -    public File getCachePath(String version)
++    public File getCacheDataPath(String version)
      {
-         Pair<String, String> names = Schema.instance.getCF(cfId);
-         return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version, "db");
 -        return DatabaseDescriptor.getSerializedCachePath(cacheType, version);
++        return DatabaseDescriptor.getSerializedCachePath( cacheType, version, "db");
 +    }
 +
-     public File getCacheCrcPath(UUID cfId, String version)
++    public File getCacheCrcPath(String version)
 +    {
-         Pair<String, String> names = Schema.instance.getCF(cfId);
-         return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version, "crc");
++        return DatabaseDescriptor.getSerializedCachePath( cacheType, version, "crc");
      }
  
      public Writer getWriter(int keysToSave)
@@@ -136,42 -170,65 +177,70 @@@
          long start = System.nanoTime();
  
          // modern format, allows both key and value (so key cache load can be purely sequential)
-         File dataPath = getCacheDataPath(cfs.metadata.cfId, CURRENT_VERSION);
-         File crcPath = getCacheCrcPath(cfs.metadata.cfId, CURRENT_VERSION);
 -        File path = getCachePath(CURRENT_VERSION);
 -        if (path.exists())
++        File dataPath = getCacheDataPath(CURRENT_VERSION);
++        File crcPath = getCacheCrcPath(CURRENT_VERSION);
 +        if (dataPath.exists() && crcPath.exists())
          {
              DataInputStream in = null;
              try
              {
 -                logger.info(String.format("reading saved cache %s", path));
 -                in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length()));
 +                logger.info(String.format("reading saved cache %s", dataPath));
 +                in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath, crcPath)), dataPath.length()));
-                 List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>();
+                 ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<Future<Pair<K, V>>>();
 -
                  while (in.available() > 0)
                  {
-                     Future<Pair<K, V>> entry = cacheLoader.deserialize(in, cfs);
+                     //ksname and cfname are serialized by the serializers in CacheService
+                     //That is delegated there because there are serializer specific conditions
+                     //where a cache key is skipped and not written
+                     String ksname = in.readUTF();
+                     String cfname = in.readUTF();
+ 
+                     ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(ksname, cfname));
+ 
+                     Future<Pair<K, V>> entryFuture = cacheLoader.deserialize(in, cfs);
                      // Key cache entry can return null, if the SSTable doesn't exist.
-                     if (entry == null)
+                     if (entryFuture == null)
                          continue;
-                     futures.add(entry);
+ 
+                     futures.offer(entryFuture);
                      count++;
+ 
+                     /*
+                      * Kind of unwise to accrue an unbounded number of pending futures
+                      * So now there is this loop to keep a bounded number pending.
+                      */
+                     do
+                     {
+                         while (futures.peek() != null && futures.peek().isDone())
+                         {
+                             Future<Pair<K, V>> future = futures.poll();
+                             Pair<K, V> entry = future.get();
+                             if (entry != null && entry.right != null)
+                                 put(entry.left, entry.right);
+                         }
+ 
+                         if (futures.size() > 1000)
+                             Thread.yield();
+                     } while(futures.size() > 1000);
                  }
  
-                 for (Future<Pair<K, V>> future : futures)
+                 Future<Pair<K, V>> future = null;
+                 while ((future = futures.poll()) != null)
                  {
                      Pair<K, V> entry = future.get();
                      if (entry != null && entry.right != null)
                          put(entry.left, entry.right);
                  }
              }
 +            catch (CorruptFileException e)
 +            {
 +                JVMStabilityInspector.inspectThrowable(e);
 +                logger.warn(String.format("Non-fatal checksum error reading saved cache %s", dataPath.getAbsolutePath()), e);
 +            }
-             catch (Exception e)
+             catch (Throwable t)
              {
-                 JVMStabilityInspector.inspectThrowable(e);
-                 logger.debug(String.format("harmless error reading saved cache %s", dataPath.getAbsolutePath()), e);
+                 JVMStabilityInspector.inspectThrowable(t);
 -                logger.info(String.format("Harmless error reading saved cache %s", path.getAbsolutePath()), t);
++                logger.info(String.format("Harmless error reading saved cache %s", dataPath.getAbsolutePath()), t);
              }
              finally
              {
@@@ -236,11 -284,9 +305,10 @@@
          public CompactionInfo getCompactionInfo()
          {
              // keyset can change in size, thus total can too
 -            return info.forProgress(keysWritten, Math.max(keysWritten, keys.size()));
 +            // TODO need to check for this one... was: info.forProgress(keysWritten, Math.max(keysWritten, keys.size()));
 +            return info.forProgress(keysWritten, Math.max(keysWritten, keysEstimate));
          }
  
-         @SuppressWarnings("resource")
          public void saveCache()
          {
              logger.debug("Deleting old {} files.", cacheType);
@@@ -254,37 -300,25 +322,26 @@@
  
              long start = System.nanoTime();
  
-             HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
-             HashMap<UUID, OutputStream> streams = new HashMap<>();
-             HashMap<UUID, Pair<File, File>> paths = new HashMap<>();
- 
 -            DataOutputStreamPlus writer = null;
 -            File tempCacheFile = tempCacheFile();
++            WrappedDataOutputStreamPlus writer = null;
++            Pair<File, File> cacheFilePaths = tempCacheFiles();
              try
              {
+                 try
+                 {
 -                    writer = new DataOutputStreamPlus(streamFactory.getOutputStream(tempCacheFile));
++                    writer = new WrappedDataOutputStreamPlus(streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right));
+                 }
+                 catch (FileNotFoundException e)
+                 {
+                     throw new RuntimeException(e);
+                 }
+ 
 -                for (K key : keys)
 +                while (keyIterator.hasNext())
                  {
 +                    K key = keyIterator.next();
-                     UUID cfId = key.getCFId();
-                     if (!Schema.instance.hasCF(key.getCFId()))
-                         continue; // the table has been dropped.
  
-                     DataOutputPlus writer = writers.get(cfId);
-                     if (writer == null)
-                     {
-                         Pair<File, File> cacheFilePaths = tempCacheFiles(cfId);
-                         OutputStream stream;
-                         try
-                         {
-                             stream = streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right);
-                             writer = new WrappedDataOutputStreamPlus(stream);
-                         }
-                         catch (FileNotFoundException e)
-                         {
-                             throw new RuntimeException(e);
-                         }
-                         paths.put(cfId, cacheFilePaths);
-                         streams.put(cfId, stream);
-                         writers.put(cfId, writer);
-                     }
+                     ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(key.ksAndCFName);
+                     if (cfs == null)
+                         continue; // the table or 2i has been dropped.
  
                      try
                      {
@@@ -292,7 -326,7 +349,7 @@@
                      }
                      catch (IOException e)
                      {
-                         throw new FSWriteError(e, paths.get(cfId).left);
 -                        throw new FSWriteError(e, tempCacheFile);
++                        throw new FSWriteError(e, cacheFilePaths.left);
                      }
  
                      keysWritten++;
@@@ -302,49 -334,24 +359,31 @@@
              }
              finally
              {
-                 if (keyIterator instanceof Closeable)
-                     try
-                     {
-                         ((Closeable)keyIterator).close();
-                     }
-                     catch (IOException ignored)
-                     {
-                         // not thrown (by OHC)
-                     }
- 
-                 for (OutputStream writer : streams.values())
-                 {
+                 if (writer != null)
                      FileUtils.closeQuietly(writer);
-                 }
              }
  
-             for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
-             {
-                 UUID cfId = entry.getKey();
 -            File cacheFile = getCachePath(CURRENT_VERSION);
++            File cacheFile = getCacheDataPath(CURRENT_VERSION);
++            File crcFile = getCacheCrcPath(CURRENT_VERSION);
  
-                 Pair<File, File> tmpFiles = paths.get(cfId);
-                 File cacheFile = getCacheDataPath(cfId, CURRENT_VERSION);
-                 File crcFile = getCacheCrcPath(cfId, CURRENT_VERSION);
+             cacheFile.delete(); // ignore error if it didn't exist
++            crcFile.delete();
 +
-                 cacheFile.delete(); // ignore error if it didn't exist
-                 crcFile.delete();
++            if (!cacheFilePaths.left.renameTo(cacheFile))
++                logger.error("Unable to rename {} to {}", cacheFilePaths.left, cacheFile);
  
-                 if (!tmpFiles.left.renameTo(cacheFile))
-                     logger.error("Unable to rename {} to {}", tmpFiles.left, cacheFile);
- 
-                 if (!tmpFiles.right.renameTo(crcFile))
-                     logger.error("Unable to rename {} to {}", tmpFiles.right, crcFile);
-             }
 -            if (!tempCacheFile.renameTo(cacheFile))
 -                logger.error("Unable to rename {} to {}", tempCacheFile, cacheFile);
++            if (!cacheFilePaths.right.renameTo(crcFile))
++                logger.error("Unable to rename {} to {}", cacheFilePaths.right, crcFile);
  
 -            logger.info("Saved {} ({} items) in {} ms", cacheType, keys.size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 +            logger.info("Saved {} ({} items) in {} ms", cacheType, keysWritten, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
          }
  
-         private Pair<File, File> tempCacheFiles(UUID cfId)
 -        private File tempCacheFile()
++        private Pair<File, File> tempCacheFiles()
          {
-             File dataPath = getCacheDataPath(cfId, CURRENT_VERSION);
-             File crcPath = getCacheCrcPath(cfId, CURRENT_VERSION);
 -            File path = getCachePath(CURRENT_VERSION);
 -            return FileUtils.createTempFile(path.getName(), null, path.getParentFile());
++            File dataPath = getCacheDataPath(CURRENT_VERSION);
++            File crcPath = getCacheCrcPath(CURRENT_VERSION);
 +            return Pair.create(FileUtils.createTempFile(dataPath.getName(), null, dataPath.getParentFile()),
 +                               FileUtils.createTempFile(crcPath.getName(), null, crcPath.getParentFile()));
          }
  
          private void deleteOldCacheFiles()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/cache/OHCProvider.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cache/OHCProvider.java
index e4cfb69,0000000..9b1c8cf
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cache/OHCProvider.java
+++ b/src/java/org/apache/cassandra/cache/OHCProvider.java
@@@ -1,282 -1,0 +1,285 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cache;
 +
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.nio.channels.WritableByteChannel;
 +import java.util.Iterator;
- import java.util.UUID;
 +
 +import com.google.common.base.Function;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ColumnFamily;
 +import org.apache.cassandra.db.TypeSizes;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.io.util.Memory;
 +import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.utils.Pair;
 +import org.caffinitas.ohc.OHCache;
 +import org.caffinitas.ohc.OHCacheBuilder;
 +
 +public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
 +{
 +    public ICache<RowCacheKey, IRowCacheEntry> create()
 +    {
 +        OHCacheBuilder<RowCacheKey, IRowCacheEntry> builder = OHCacheBuilder.newBuilder();
 +        builder.capacity(DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024)
 +               .keySerializer(new KeySerializer())
 +               .valueSerializer(new ValueSerializer())
 +               .throwOOME(true);
 +
 +        return new OHCacheAdapter(builder.build());
 +    }
 +
 +    private static class OHCacheAdapter implements ICache<RowCacheKey, IRowCacheEntry>
 +    {
 +        private final OHCache<RowCacheKey, IRowCacheEntry> ohCache;
 +
 +        public OHCacheAdapter(OHCache<RowCacheKey, IRowCacheEntry> ohCache)
 +        {
 +            this.ohCache = ohCache;
 +        }
 +
 +        public long capacity()
 +        {
 +            return ohCache.capacity();
 +        }
 +
 +        public void setCapacity(long capacity)
 +        {
 +            ohCache.setCapacity(capacity);
 +        }
 +
 +        public void put(RowCacheKey key, IRowCacheEntry value)
 +        {
 +            ohCache.put(key, value);
 +        }
 +
 +        public boolean putIfAbsent(RowCacheKey key, IRowCacheEntry value)
 +        {
 +            return ohCache.putIfAbsent(key, value);
 +        }
 +
 +        public boolean replace(RowCacheKey key, IRowCacheEntry old, IRowCacheEntry value)
 +        {
 +            return ohCache.addOrReplace(key, old, value);
 +        }
 +
 +        public IRowCacheEntry get(RowCacheKey key)
 +        {
 +            return ohCache.get(key);
 +        }
 +
 +        public void remove(RowCacheKey key)
 +        {
 +            ohCache.remove(key);
 +        }
 +
 +        public int size()
 +        {
 +            return (int) ohCache.size();
 +        }
 +
 +        public long weightedSize()
 +        {
 +            return ohCache.size();
 +        }
 +
 +        public void clear()
 +        {
 +            ohCache.clear();
 +        }
 +
 +        public Iterator<RowCacheKey> hotKeyIterator(int n)
 +        {
 +            return ohCache.hotKeyIterator(n);
 +        }
 +
 +        public Iterator<RowCacheKey> keyIterator()
 +        {
 +            return ohCache.keyIterator();
 +        }
 +
 +        public boolean containsKey(RowCacheKey key)
 +        {
 +            return ohCache.containsKey(key);
 +        }
 +    }
 +
 +    private static class KeySerializer implements org.caffinitas.ohc.CacheSerializer<RowCacheKey>
 +    {
 +        public void serialize(RowCacheKey rowCacheKey, DataOutput dataOutput) throws IOException
 +        {
-             dataOutput.writeLong(rowCacheKey.cfId.getMostSignificantBits());
-             dataOutput.writeLong(rowCacheKey.cfId.getLeastSignificantBits());
++            dataOutput.writeUTF(rowCacheKey.ksAndCFName.left);
++            dataOutput.writeUTF(rowCacheKey.ksAndCFName.right);
 +            dataOutput.writeInt(rowCacheKey.key.length);
 +            dataOutput.write(rowCacheKey.key);
 +        }
 +
 +        public RowCacheKey deserialize(DataInput dataInput) throws IOException
 +        {
-             long msb = dataInput.readLong();
-             long lsb = dataInput.readLong();
++            String ksName = dataInput.readUTF();
++            String cfName = dataInput.readUTF();
 +            byte[] key = new byte[dataInput.readInt()];
 +            dataInput.readFully(key);
-             return new RowCacheKey(new UUID(msb, lsb), key);
++            return new RowCacheKey(Pair.create(ksName, cfName), key);
 +        }
 +
 +        public int serializedSize(RowCacheKey rowCacheKey)
 +        {
-             return 20 + rowCacheKey.key.length;
++            return TypeSizes.NATIVE.sizeof(rowCacheKey.ksAndCFName.left)
++                    + TypeSizes.NATIVE.sizeof(rowCacheKey.ksAndCFName.right)
++                    + 4
++                    + rowCacheKey.key.length;
 +        }
 +    }
 +
 +    private static class ValueSerializer implements org.caffinitas.ohc.CacheSerializer<IRowCacheEntry>
 +    {
 +        public void serialize(IRowCacheEntry entry, DataOutput out) throws IOException
 +        {
 +            assert entry != null; // unlike CFS we don't support nulls, since there is no need for that in the cache
 +            boolean isSentinel = entry instanceof RowCacheSentinel;
 +            out.writeBoolean(isSentinel);
 +            if (isSentinel)
 +                out.writeLong(((RowCacheSentinel) entry).sentinelId);
 +            else
 +                ColumnFamily.serializer.serialize((ColumnFamily) entry, new DataOutputPlusAdapter(out), MessagingService.current_version);
 +        }
 +
 +        public IRowCacheEntry deserialize(DataInput in) throws IOException
 +        {
 +            boolean isSentinel = in.readBoolean();
 +            if (isSentinel)
 +                return new RowCacheSentinel(in.readLong());
 +            return ColumnFamily.serializer.deserialize(in, MessagingService.current_version);
 +        }
 +
 +        public int serializedSize(IRowCacheEntry entry)
 +        {
 +            TypeSizes typeSizes = TypeSizes.NATIVE;
 +            int size = typeSizes.sizeof(true);
 +            if (entry instanceof RowCacheSentinel)
 +                size += typeSizes.sizeof(((RowCacheSentinel) entry).sentinelId);
 +            else
 +                size += ColumnFamily.serializer.serializedSize((ColumnFamily) entry, typeSizes, MessagingService.current_version);
 +            return size;
 +        }
 +    }
 +
 +    static class DataOutputPlusAdapter implements DataOutputPlus
 +    {
 +        private final DataOutput out;
 +
 +        public void write(byte[] b) throws IOException
 +        {
 +            out.write(b);
 +        }
 +
 +        public void write(byte[] b, int off, int len) throws IOException
 +        {
 +            out.write(b, off, len);
 +        }
 +
 +        public void write(int b) throws IOException
 +        {
 +            out.write(b);
 +        }
 +
 +        public void writeBoolean(boolean v) throws IOException
 +        {
 +            out.writeBoolean(v);
 +        }
 +
 +        public void writeByte(int v) throws IOException
 +        {
 +            out.writeByte(v);
 +        }
 +
 +        public void writeBytes(String s) throws IOException
 +        {
 +            out.writeBytes(s);
 +        }
 +
 +        public void writeChar(int v) throws IOException
 +        {
 +            out.writeChar(v);
 +        }
 +
 +        public void writeChars(String s) throws IOException
 +        {
 +            out.writeChars(s);
 +        }
 +
 +        public void writeDouble(double v) throws IOException
 +        {
 +            out.writeDouble(v);
 +        }
 +
 +        public void writeFloat(float v) throws IOException
 +        {
 +            out.writeFloat(v);
 +        }
 +
 +        public void writeInt(int v) throws IOException
 +        {
 +            out.writeInt(v);
 +        }
 +
 +        public void writeLong(long v) throws IOException
 +        {
 +            out.writeLong(v);
 +        }
 +
 +        public void writeShort(int v) throws IOException
 +        {
 +            out.writeShort(v);
 +        }
 +
 +        public void writeUTF(String s) throws IOException
 +        {
 +            out.writeUTF(s);
 +        }
 +
 +        public DataOutputPlusAdapter(DataOutput out)
 +        {
 +            this.out = out;
 +        }
 +
 +        public void write(ByteBuffer buffer) throws IOException
 +        {
 +            if (buffer.hasArray())
 +                out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
 +            else
 +                throw new UnsupportedOperationException("IMPLEMENT ME");
 +        }
 +
 +        public void write(Memory memory, long offset, long length) throws IOException
 +        {
 +            throw new UnsupportedOperationException("IMPLEMENT ME");
 +        }
 +
 +        public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException
 +        {
 +            throw new UnsupportedOperationException("IMPLEMENT ME");
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/cache/RowCacheKey.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cache/RowCacheKey.java
index ccb85d8,c959fd1..e02db42
--- a/src/java/org/apache/cassandra/cache/RowCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/RowCacheKey.java
@@@ -33,20 -31,14 +31,20 @@@ public final class RowCacheKey extends 
  
      private static final long EMPTY_SIZE = ObjectSizes.measure(new RowCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER));
  
-     public RowCacheKey(UUID cfId, byte[] key)
++    public RowCacheKey(Pair<String, String> ksAndCFName, byte[] key)
 +    {
-         this.cfId = cfId;
++        super(ksAndCFName);
 +        this.key = key;
 +    }
 +
-     public RowCacheKey(UUID cfId, DecoratedKey key)
+     public RowCacheKey(Pair<String, String> ksAndCFName, DecoratedKey key)
      {
-         this(cfId, key.getKey());
+         this(ksAndCFName, key.getKey());
      }
  
-     public RowCacheKey(UUID cfId, ByteBuffer key)
+     public RowCacheKey(Pair<String, String> ksAndCFName, ByteBuffer key)
      {
-         this.cfId = cfId;
+         super(ksAndCFName);
          this.key = ByteBufferUtil.getArray(key);
          assert this.key != null;
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/CFMetaData.java
index 6468973,2939f09..348eb89
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@@ -47,11 -48,15 +47,12 @@@ import org.apache.cassandra.db.marshal.
  import org.apache.cassandra.exceptions.*;
  import org.apache.cassandra.io.compress.CompressionParameters;
  import org.apache.cassandra.io.compress.LZ4Compressor;
 -import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.serializers.MarshalException;
 -import org.apache.cassandra.thrift.CfDef;
 -import org.apache.cassandra.thrift.CqlResult;
 -import org.apache.cassandra.thrift.CqlRow;
 -import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.io.sstable.format.Version;
 +import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.schema.LegacySchemaTables;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.UUIDGen;
  import org.github.jamm.Unmetered;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 545ad05,84381a0..c459b5d
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -1480,20 -1431,17 +1480,11 @@@ public class DatabaseDescripto
          return conf.max_hint_window_in_ms;
      }
  
-     public static File getSerializedCachePath(String ksName,
-                                               String cfName,
-                                               UUID cfId,
-                                               CacheService.CacheType cacheType,
-                                               String version,
-                                               String extension)
 -    @Deprecated
 -    public static Integer getIndexInterval()
--    {
-         StringBuilder builder = new StringBuilder();
-         builder.append(ksName).append('-');
-         builder.append(cfName).append('-');
-         builder.append(ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(cfId))).append('-');
-         builder.append(cacheType);
-         builder.append((version == null ? "" : "-" + version + "." + extension));
-         return new File(conf.saved_caches_directory, builder.toString());
 -        return conf.index_interval;
 -    }
 -
 -    public static File getSerializedCachePath(CacheService.CacheType cacheType, String version)
++    public static File getSerializedCachePath(CacheService.CacheType cacheType, String version, String extension)
+     {
+         String name = cacheType.toString()
 -                + (version == null ? "" : "-" + version + ".db");
++                + (version == null ? "" : "-" + version + "." + extension);
+         return new File(conf.saved_caches_directory, name);
      }
  
      public static int getDynamicUpdateInterval()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Schema.java
index 548341e,fada670..00c9358
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@@ -26,18 -28,14 +26,19 @@@ import com.google.common.collect.Sets
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 +import org.apache.cassandra.cql3.functions.Functions;
 +import org.apache.cassandra.cql3.functions.UDAggregate;
 +import org.apache.cassandra.cql3.functions.UDFunction;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.commitlog.CommitLog;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.marshal.UserType;
+ import org.apache.cassandra.db.index.SecondaryIndex;
 -import org.apache.cassandra.db.index.SecondaryIndexManager;
  import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.schema.LegacySchemaTables;
  import org.apache.cassandra.service.MigrationManager;
  import org.apache.cassandra.utils.ConcurrentBiMap;
 -import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.Pair;
  import org.cliffc.high_scale_lib.NonBlockingHashMap;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 343ecee,ffaa276..a8a8910
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1665,9 -1655,9 +1626,9 @@@ public class ColumnFamilyStore implemen
      private ColumnFamily getThroughCache(UUID cfId, QueryFilter filter)
      {
          assert isRowCacheEnabled()
 -               : String.format("Row cache is not enabled on column family [" + name + "]");
 +               : String.format("Row cache is not enabled on table [" + name + "]");
  
-         RowCacheKey key = new RowCacheKey(cfId, filter.key);
+         RowCacheKey key = new RowCacheKey(metadata.ksAndCFName, filter.key);
  
          // attempt a sentinel-read-cache sequence.  if a write invalidates our sentinel, we'll return our
          // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
@@@ -2075,23 -2026,19 +2036,23 @@@
      {
          Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
  
 -        for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
 +        for (Iterator<RowCacheKey> keyIter = CacheService.instance.rowCache.keyIterator();
 +             keyIter.hasNext(); )
          {
 +            RowCacheKey key = keyIter.next();
              DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
-             if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges))
+             if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges))
                  invalidateCachedRow(dk);
          }
  
          if (metadata.isCounter())
          {
 -            for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
 +            for (Iterator<CounterCacheKey> keyIter = CacheService.instance.counterCache.keyIterator();
 +                 keyIter.hasNext(); )
              {
 +                CounterCacheKey key = keyIter.next();
                  DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
-                 if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges))
+                 if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges))
                      CacheService.instance.counterCache.remove(key);
              }
          }
@@@ -2965,13 -2955,21 +2926,13 @@@
          }
      }
  
 -    /**
 -     * Returns the creation time of the oldest memtable not fully flushed yet.
 -     */
 -    public long oldestUnflushedMemtable()
 -    {
 -        return data.getView().getOldestMemtable().creationTime();
 -    }
 -
      public boolean isEmpty()
      {
 -        DataTracker.View view = data.getView();
 -        return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.getCurrentMemtable() == view.getOldestMemtable();
 +        View view = data.getView();
 +        return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.liveMemtables.size() <= 1 && view.flushingMemtables.size() == 0;
      }
  
-     private boolean isRowCacheEnabled()
+     public boolean isRowCacheEnabled()
      {
          return metadata.getCaching().rowCache.isEnabled() && CacheService.instance.rowCache.getCapacity() > 0;
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------


[2/7] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by sn...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6479d949
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6479d949
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6479d949

Branch: refs/heads/cassandra-2.2
Commit: 6479d9495ca54bb450f46f527a795b687aad3d49
Parents: a7282e4 3aff449
Author: T Jake Luciani <ja...@apache.org>
Authored: Wed Sep 16 13:40:51 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Wed Sep 16 13:40:51 2015 -0400

----------------------------------------------------------------------

----------------------------------------------------------------------