You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/11/22 19:13:07 UTC

[1/2] allocate fixed index summary memory pool and resample cold index summaries to use less memory patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-5519

Updated Branches:
  refs/heads/trunk 40598efa6 -> dbd1a727b


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
new file mode 100644
index 0000000..aac70ec
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+import static org.apache.cassandra.io.sstable.Downsampling.MIN_SAMPLING_LEVEL;
+import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD;
+import static org.apache.cassandra.io.sstable.IndexSummaryManager.UPSAMPLE_THRESHOLD;
+import static org.apache.cassandra.io.sstable.IndexSummaryManager.redistributeSummaries;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class IndexSummaryManagerTest extends SchemaLoader
+{
+    private static final Logger logger = LoggerFactory.getLogger(IndexSummaryManagerTest.class);
+
+
+    private static long totalOffHeapSize(List<SSTableReader> sstables)
+    {
+        long total = 0;
+        for (SSTableReader sstable : sstables)
+            total += sstable.getIndexSummaryOffHeapSize();
+
+        return total;
+    }
+
+    private static List<SSTableReader> resetSummaries(List<SSTableReader> sstables, long originalOffHeapSize) throws IOException
+    {
+        for (SSTableReader sstable : sstables)
+            sstable.readMeter = new RestorableMeter(100.0, 100.0);
+
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, originalOffHeapSize * sstables.size());
+        for (SSTableReader sstable : sstables)
+            assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
+
+        return sstables;
+    }
+
+    private void validateData(ColumnFamilyStore cfs, int numRows)
+    {
+        for (int i = 0; i < numRows; i++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(i));
+            QueryFilter filter = QueryFilter.getIdentityFilter(key, cfs.getColumnFamilyName(), System.currentTimeMillis());
+            ColumnFamily row = cfs.getColumnFamily(filter);
+            assertNotNull(row);
+            Column column = row.getColumn(ByteBufferUtil.bytes("column"));
+            assertNotNull(column);
+            assertEquals(100, column.value().array().length);
+        }
+    }
+
+    private Comparator<SSTableReader> hotnessComparator = new Comparator<SSTableReader>()
+    {
+        public int compare(SSTableReader o1, SSTableReader o2)
+        {
+            return Double.compare(o1.readMeter.fifteenMinuteRate(), o2.readMeter.fifteenMinuteRate());
+        }
+    };
+
+    @Test
+    public void testRedistributeSummaries() throws IOException
+    {
+        String ksname = "Keyspace1";
+        String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        int numSSTables = 4;
+        int numRows = 256;
+        for (int sstable = 0; sstable < numSSTables; sstable++)
+        {
+            for (int row = 0; row < numRows; row++)
+            {
+                DecoratedKey key = Util.dk(String.valueOf(row));
+                RowMutation rm = new RowMutation(ksname, key.key);
+                rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+                rm.apply();
+            }
+            cfs.forceBlockingFlush();
+        }
+
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
+        assertEquals(numSSTables, sstables.size());
+        validateData(cfs, numRows);
+
+        for (SSTableReader sstable : sstables)
+            sstable.readMeter = new RestorableMeter(100.0, 100.0);
+
+        long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize();
+
+        // there should be enough space to not downsample anything
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * numSSTables));
+        for (SSTableReader sstable : sstables)
+            assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
+        assertEquals(singleSummaryOffHeapSpace * numSSTables, totalOffHeapSize(sstables));
+        validateData(cfs, numRows);
+
+        // everything should get cut in half
+        assert sstables.size() == 4;
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * (numSSTables / 2)));
+        for (SSTableReader sstable : sstables)
+            assertEquals(BASE_SAMPLING_LEVEL / 2, sstable.getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+        // everything should get cut to a quarter
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * (numSSTables / 4)));
+        for (SSTableReader sstable : sstables)
+            assertEquals(BASE_SAMPLING_LEVEL / 4, sstable.getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+        // upsample back up to half
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables,(singleSummaryOffHeapSpace * (numSSTables / 2)));
+        assert sstables.size() == 4;
+        for (SSTableReader sstable : sstables)
+            assertEquals(BASE_SAMPLING_LEVEL / 2, sstable.getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+        // upsample back up to the original index summary
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * numSSTables));
+        for (SSTableReader sstable : sstables)
+            assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+        // make two of the four sstables cold, only leave enough space for three full index summaries,
+        // so the two cold sstables should get downsampled to be half of their original size
+        sstables.get(0).readMeter = new RestorableMeter(50.0, 50.0);
+        sstables.get(1).readMeter = new RestorableMeter(50.0, 50.0);
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3));
+        Collections.sort(sstables, hotnessComparator);
+        assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel());
+        assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(1).getIndexSummarySamplingLevel());
+        assertEquals(BASE_SAMPLING_LEVEL, sstables.get(2).getIndexSummarySamplingLevel());
+        assertEquals(BASE_SAMPLING_LEVEL, sstables.get(3).getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+        // small increases or decreases in the read rate don't result in downsampling or upsampling
+        double lowerRate = 50.0 * (DOWNSAMPLE_THESHOLD + (DOWNSAMPLE_THESHOLD * 0.10));
+        double higherRate = 50.0 * (UPSAMPLE_THRESHOLD - (UPSAMPLE_THRESHOLD * 0.10));
+        sstables.get(0).readMeter = new RestorableMeter(lowerRate, lowerRate);
+        sstables.get(1).readMeter = new RestorableMeter(higherRate, higherRate);
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3));
+        Collections.sort(sstables, hotnessComparator);
+        assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel());
+        assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(1).getIndexSummarySamplingLevel());
+        assertEquals(BASE_SAMPLING_LEVEL, sstables.get(2).getIndexSummarySamplingLevel());
+        assertEquals(BASE_SAMPLING_LEVEL, sstables.get(3).getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+        // reset, and then this time, leave enough space for one of the cold sstables to not get downsampled
+        sstables = resetSummaries(sstables, singleSummaryOffHeapSpace);
+        sstables.get(0).readMeter = new RestorableMeter(1.0, 1.0);
+        sstables.get(1).readMeter = new RestorableMeter(2.0, 2.0);
+        sstables.get(2).readMeter = new RestorableMeter(1000.0, 1000.0);
+        sstables.get(3).readMeter = new RestorableMeter(1000.0, 1000.0);
+
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3) + 50);
+        Collections.sort(sstables, hotnessComparator);
+
+        if (sstables.get(0).getIndexSummarySamplingLevel() == MIN_SAMPLING_LEVEL)
+            assertEquals(BASE_SAMPLING_LEVEL, sstables.get(1).getIndexSummarySamplingLevel());
+        else
+            assertEquals(BASE_SAMPLING_LEVEL, sstables.get(0).getIndexSummarySamplingLevel());
+
+        assertEquals(BASE_SAMPLING_LEVEL, sstables.get(2).getIndexSummarySamplingLevel());
+        assertEquals(BASE_SAMPLING_LEVEL, sstables.get(3).getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+
+        // Cause a mix of upsampling and downsampling. We'll leave enough space for two full index summaries. The two
+        // coldest sstables will get downsampled to 8/128 of their size, leaving us with 1 and 112/128th index
+        // summaries worth of space.  The hottest sstable should get a full index summary, and the one in the middle
+        // should get the remainder.
+        sstables.get(0).readMeter = new RestorableMeter(0.0, 0.0);
+        sstables.get(1).readMeter = new RestorableMeter(0.0, 0.0);
+        sstables.get(2).readMeter = new RestorableMeter(100, 100);
+        sstables.get(3).readMeter = new RestorableMeter(128.0, 128.0);
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (100.0 / BASE_SAMPLING_LEVEL))));
+        Collections.sort(sstables, hotnessComparator);
+        assertEquals(MIN_SAMPLING_LEVEL, sstables.get(0).getIndexSummarySamplingLevel());
+        assertEquals(MIN_SAMPLING_LEVEL, sstables.get(1).getIndexSummarySamplingLevel());
+        assertTrue(sstables.get(2).getIndexSummarySamplingLevel() > MIN_SAMPLING_LEVEL);
+        assertTrue(sstables.get(2).getIndexSummarySamplingLevel() < BASE_SAMPLING_LEVEL);
+        assertEquals(BASE_SAMPLING_LEVEL, sstables.get(3).getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+        // Don't leave enough space for even the minimal index summaries
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 100);
+        for (SSTableReader sstable : sstables)
+            assertEquals(MIN_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+    }
+
+    @Test
+    public void testRebuildAtSamplingLevel() throws IOException
+    {
+        String ksname = "Keyspace1";
+        String cfname = "StandardLowIndexInterval";
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        int numRows = 256;
+        for (int row = 0; row < numRows; row++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(row));
+            RowMutation rm = new RowMutation(ksname, key.key);
+            rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+            rm.apply();
+        }
+        cfs.forceBlockingFlush();
+
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
+        assertEquals(1, sstables.size());
+        SSTableReader sstable = sstables.get(0);
+
+        for (int samplingLevel = MIN_SAMPLING_LEVEL; samplingLevel < BASE_SAMPLING_LEVEL; samplingLevel++)
+        {
+            sstable = sstable.cloneWithNewSummarySamplingLevel(samplingLevel);
+            assertEquals(samplingLevel, sstable.getIndexSummarySamplingLevel());
+            int expectedSize = (numRows * samplingLevel) / (sstable.metadata.getIndexInterval() * BASE_SAMPLING_LEVEL);
+            assertEquals(expectedSize, sstable.getIndexSummarySize(), 1);
+        }
+    }
+
+    @Test
+    public void testJMXFunctions() throws IOException
+    {
+        IndexSummaryManager manager = IndexSummaryManager.instance;
+
+        // resize interval
+        assertNotNull(manager.getResizeIntervalInMinutes());
+        manager.setResizeIntervalInMinutes(-1);
+        assertNull(manager.getTimeToNextResize(TimeUnit.MINUTES));
+
+        manager.setResizeIntervalInMinutes(10);
+        assertEquals(10, manager.getResizeIntervalInMinutes());
+        assertEquals(10, manager.getTimeToNextResize(TimeUnit.MINUTES), 1);
+        manager.setResizeIntervalInMinutes(15);
+        assertEquals(15, manager.getResizeIntervalInMinutes());
+        assertEquals(15, manager.getTimeToNextResize(TimeUnit.MINUTES), 2);
+
+        // memory pool capacity
+        assertTrue(manager.getMemoryPoolCapacityInMB() >= 0);
+        manager.setMemoryPoolCapacityInMB(10);
+        assertEquals(10, manager.getMemoryPoolCapacityInMB());
+
+        String ksname = "Keyspace1";
+        String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        int numSSTables = 2;
+        int numRows = 10;
+        for (int sstable = 0; sstable < numSSTables; sstable++)
+        {
+            for (int row = 0; row < numRows; row++)
+            {
+                DecoratedKey key = Util.dk(String.valueOf(row));
+                RowMutation rm = new RowMutation(ksname, key.key);
+                rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+                rm.apply();
+            }
+            cfs.forceBlockingFlush();
+        }
+
+        assertEquals(1.0, manager.getAverageSamplingRatio(), 0.001);
+        Map<String, Double> samplingRatios = manager.getSamplingRatios();
+        for (Map.Entry<String, Double> entry : samplingRatios.entrySet())
+            assertEquals(1.0, entry.getValue(), 0.001);
+
+        manager.setMemoryPoolCapacityInMB(0);
+        manager.redistributeSummaries();
+        assertTrue(manager.getAverageSamplingRatio() < 0.99);
+        samplingRatios = manager.getSamplingRatios();
+        for (Map.Entry<String, Double> entry : samplingRatios.entrySet())
+        {
+            if (entry.getKey().contains("StandardLowIndexInterval"))
+                assertTrue(entry.getValue() < 0.9);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index 8e73161..8d013f9 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -23,22 +23,25 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 
 import com.google.common.collect.Lists;
 import org.junit.Test;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
-import static org.junit.Assert.assertArrayEquals;
+import static org.apache.cassandra.io.sstable.IndexSummaryBuilder.downsample;
+import static org.apache.cassandra.io.sstable.IndexSummaryBuilder.entriesAtSamplingLevel;
+import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+import static org.apache.cassandra.io.sstable.Downsampling.MIN_SAMPLING_LEVEL;
+
+import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
 
 public class IndexSummaryTest
@@ -73,13 +76,13 @@ public class IndexSummaryTest
         Pair<List<DecoratedKey>, IndexSummary> random = generateRandomIndex(100, 1);
         ByteArrayOutputStream aos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(aos);
-        IndexSummary.serializer.serialize(random.right, dos);
+        IndexSummary.serializer.serialize(random.right, dos, false);
         // write junk
         dos.writeUTF("JUNK");
         dos.writeUTF("JUNK");
         FileUtils.closeQuietly(dos);
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(aos.toByteArray()));
-        IndexSummary is = IndexSummary.serializer.deserialize(dis, DatabaseDescriptor.getPartitioner());
+        IndexSummary is = IndexSummary.serializer.deserialize(dis, DatabaseDescriptor.getPartitioner(), false);
         for (int i = 0; i < 100; i++)
             assertEquals(i, is.binarySearch(random.left.get(i)));
         // read the junk
@@ -92,7 +95,7 @@ public class IndexSummaryTest
     public void testAddEmptyKey() throws Exception
     {
         IPartitioner p = new RandomPartitioner();
-        IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1);
+        IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1, BASE_SAMPLING_LEVEL);
         builder.maybeAddEntry(p.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER), 0);
         IndexSummary summary = builder.build(p);
         assertEquals(1, summary.size());
@@ -101,9 +104,9 @@ public class IndexSummaryTest
 
         ByteArrayOutputStream aos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(aos);
-        IndexSummary.serializer.serialize(summary, dos);
+        IndexSummary.serializer.serialize(summary, dos, false);
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(aos.toByteArray()));
-        IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p);
+        IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false);
 
         assertEquals(1, loaded.size());
         assertEquals(summary.getPosition(0), loaded.getPosition(0));
@@ -113,7 +116,7 @@ public class IndexSummaryTest
     private Pair<List<DecoratedKey>, IndexSummary> generateRandomIndex(int size, int interval)
     {
         List<DecoratedKey> list = Lists.newArrayList();
-        IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), interval);
+        IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), interval, BASE_SAMPLING_LEVEL);
         for (int i = 0; i < size; i++)
         {
             UUID uuid = UUID.randomUUID();
@@ -126,4 +129,128 @@ public class IndexSummaryTest
         IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner());
         return Pair.create(list, summary);
     }
-}
+
+    @Test
+    public void testDownsamplePatterns()
+    {
+        assertEquals(Arrays.asList(0), Downsampling.getSamplingPattern(0));
+        assertEquals(Arrays.asList(0), Downsampling.getSamplingPattern(1));
+
+        assertEquals(Arrays.asList(0, 1), Downsampling.getSamplingPattern(2));
+        assertEquals(Arrays.asList(0, 2, 1, 3), Downsampling.getSamplingPattern(4));
+        assertEquals(Arrays.asList(0, 4, 2, 6, 1, 5, 3, 7), Downsampling.getSamplingPattern(8));
+        assertEquals(Arrays.asList(0, 8, 4, 12, 2, 10, 6, 14, 1, 9, 5, 13, 3, 11, 7, 15), Downsampling.getSamplingPattern(16));
+    }
+
+    private static boolean shouldSkip(int index, List<Integer> startPoints)
+    {
+        for (int start : startPoints)
+        {
+            if ((index - start) % BASE_SAMPLING_LEVEL == 0)
+                return true;
+        }
+        return false;
+    }
+
+    @Test
+    public void testDownsample()
+    {
+        final int NUM_KEYS = 4096;
+        final int INDEX_INTERVAL = 128;
+        final int ORIGINAL_NUM_ENTRIES = NUM_KEYS / INDEX_INTERVAL;
+
+
+        Pair<List<DecoratedKey>, IndexSummary> random = generateRandomIndex(NUM_KEYS, INDEX_INTERVAL);
+        List<DecoratedKey> keys = random.left;
+        IndexSummary original = random.right;
+
+        // sanity check on the original index summary
+        for (int i = 0; i < ORIGINAL_NUM_ENTRIES; i++)
+            assertEquals(keys.get(i * INDEX_INTERVAL).key, ByteBuffer.wrap(original.getKey(i)));
+
+        List<Integer> samplePattern = Downsampling.getSamplingPattern(BASE_SAMPLING_LEVEL);
+
+        // downsample by one level, then two levels, then three levels...
+        int downsamplingRound = 1;
+        for (int samplingLevel = BASE_SAMPLING_LEVEL - 1; samplingLevel >= MIN_SAMPLING_LEVEL; samplingLevel--)
+        {
+            IndexSummary downsampled = downsample(original, samplingLevel, DatabaseDescriptor.getPartitioner());
+            assertEquals(entriesAtSamplingLevel(samplingLevel, original.getMaxNumberOfEntries()), downsampled.size());
+
+            int sampledCount = 0;
+            List<Integer> skipStartPoints = samplePattern.subList(0, downsamplingRound);
+            for (int i = 0; i < ORIGINAL_NUM_ENTRIES; i++)
+            {
+                if (!shouldSkip(i, skipStartPoints))
+                {
+                    assertEquals(keys.get(i * INDEX_INTERVAL).key, ByteBuffer.wrap(downsampled.getKey(sampledCount)));
+                    sampledCount++;
+                }
+            }
+            downsamplingRound++;
+        }
+
+        // downsample one level each time
+        IndexSummary previous = original;
+        downsamplingRound = 1;
+        for (int downsampleLevel = BASE_SAMPLING_LEVEL - 1; downsampleLevel >= MIN_SAMPLING_LEVEL; downsampleLevel--)
+        {
+            IndexSummary downsampled = downsample(previous, downsampleLevel, DatabaseDescriptor.getPartitioner());
+            assertEquals(entriesAtSamplingLevel(downsampleLevel, original.getMaxNumberOfEntries()), downsampled.size());
+
+            int sampledCount = 0;
+            List<Integer> skipStartPoints = samplePattern.subList(0, downsamplingRound);
+            for (int i = 0; i < ORIGINAL_NUM_ENTRIES; i++)
+            {
+                if (!shouldSkip(i, skipStartPoints))
+                {
+                    assertEquals(keys.get(i * INDEX_INTERVAL).key, ByteBuffer.wrap(downsampled.getKey(sampledCount)));
+                    sampledCount++;
+                }
+            }
+
+            previous = downsampled;
+            downsamplingRound++;
+        }
+    }
+
+    @Test
+    public void testOriginalIndexLookup()
+    {
+        for (int i = BASE_SAMPLING_LEVEL; i >= MIN_SAMPLING_LEVEL; i--)
+            assertEquals(i, Downsampling.getOriginalIndexes(i).size());
+
+        ArrayList<Integer> full = new ArrayList<>();
+        for (int i = 0; i < BASE_SAMPLING_LEVEL; i++)
+            full.add(i);
+
+        assertEquals(full, Downsampling.getOriginalIndexes(BASE_SAMPLING_LEVEL));
+        // the entry at index 0 is the first to go
+        assertEquals(full.subList(1, full.size()), Downsampling.getOriginalIndexes(BASE_SAMPLING_LEVEL - 1));
+
+        // spot check a few values (these depend on BASE_SAMPLING_LEVEL being 128)
+        assert BASE_SAMPLING_LEVEL == 128;
+        assertEquals(Arrays.asList(31, 63, 95, 127), Downsampling.getOriginalIndexes(4));
+        assertEquals(Arrays.asList(63, 127), Downsampling.getOriginalIndexes(2));
+        assertEquals(Arrays.asList(), Downsampling.getOriginalIndexes(0));
+    }
+
+    @Test
+    public void testGetNumberOfSkippedEntriesAfterIndex()
+    {
+        int indexInterval = 128;
+        for (int i = 0; i < BASE_SAMPLING_LEVEL; i++)
+            assertEquals(indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(i, BASE_SAMPLING_LEVEL, indexInterval));
+
+        // with one round of downsampling, only the first summary has been removed, so only the last index will have
+        // double the gap until the next sample
+        for (int i = 0; i < BASE_SAMPLING_LEVEL - 2; i++)
+            assertEquals(indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(i, BASE_SAMPLING_LEVEL - 1, indexInterval));
+        assertEquals(indexInterval * 2, Downsampling.getEffectiveIndexIntervalAfterIndex(BASE_SAMPLING_LEVEL - 2, BASE_SAMPLING_LEVEL - 1, indexInterval));
+
+        // at samplingLevel=2, the retained summary points are [63, 127] (assumes BASE_SAMPLING_LEVEL is 128)
+        assert BASE_SAMPLING_LEVEL == 128;
+        assertEquals(64 * indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(0, 2, indexInterval));
+        assertEquals(64 * indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(1, 2, indexInterval));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index b771e72..6bf17fd 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -23,17 +23,20 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.junit.Assert;
 import com.google.common.collect.Sets;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
@@ -54,11 +57,17 @@ import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class SSTableReaderTest extends SchemaLoader
 {
+    private static final Logger logger = LoggerFactory.getLogger(SSTableReaderTest.class);
+
     static Token t(int i)
     {
         return StorageService.getPartitioner().getToken(ByteBufferUtil.bytes(String.valueOf(i)));
@@ -252,8 +261,8 @@ public class SSTableReaderTest extends SchemaLoader
 
         // test to see if sstable can be opened as expected
         SSTableReader target = SSTableReader.open(desc);
-        Assert.assertEquals(target.getKeySampleSize(), 1);
-        Assert.assertArrayEquals(ByteBufferUtil.getArray(firstKey.key), target.getKeySample(0));
+        Assert.assertEquals(target.getIndexSummarySize(), 1);
+        Assert.assertArrayEquals(ByteBufferUtil.getArray(firstKey.key), target.getIndexSummaryKey(0));
         assert target.first.equals(firstKey);
         assert target.last.equals(lastKey);
     }
@@ -341,6 +350,64 @@ public class SSTableReaderTest extends SchemaLoader
         assert sections.size() == 1 : "Expected to find range in sstable opened for bulk loading";
     }
 
+    @Test
+    public void testIndexSummaryReplacement() throws IOException, ExecutionException, InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open("Keyspace1");
+        final ColumnFamilyStore store = keyspace.getColumnFamilyStore("StandardLowIndexInterval"); // index interval of 8, no key caching
+        CompactionManager.instance.disableAutoCompaction();
+
+        final int NUM_ROWS = 1000;
+        for (int j = 0; j < NUM_ROWS; j++)
+        {
+            ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", j));
+            RowMutation rm = new RowMutation("Keyspace1", key);
+            rm.add("StandardLowIndexInterval", ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes(String.format("%3d", j)), j);
+            rm.apply();
+        }
+        store.forceBlockingFlush();
+        CompactionManager.instance.performMaximal(store);
+
+        Collection<SSTableReader> sstables = store.getSSTables();
+        assert sstables.size() == 1;
+        final SSTableReader sstable = sstables.iterator().next();
+
+        ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5);
+        List<Future> futures = new ArrayList<>(NUM_ROWS * 2);
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            final ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", i));
+            final int index = i;
+
+            futures.add(executor.submit(new Runnable()
+            {
+                public void run()
+                {
+                    ColumnFamily result = store.getColumnFamily(sstable.partitioner.decorateKey(key), ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 100, 100);
+                    assertFalse(result.isEmpty());
+                    assertEquals(0, ByteBufferUtil.compare(String.format("%3d", index).getBytes(), result.getColumn(ByteBufferUtil.bytes("0")).value()));
+                }
+            }));
+
+            futures.add(executor.submit(new Runnable()
+            {
+                public void run()
+                {
+                    Iterable<DecoratedKey> results = store.keySamples(
+                            new Range<>(sstable.partitioner.getMinimumToken(), sstable.partitioner.getToken(key)));
+                    assertTrue(results.iterator().hasNext());
+                }
+            }));
+        }
+
+        SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(Downsampling.MIN_SAMPLING_LEVEL);
+        store.getDataTracker().replaceReaders(Arrays.asList(sstable), Arrays.asList(replacement));
+        for (Future future : futures)
+            future.get();
+
+        assertEquals(sstable.estimatedKeys(), replacement.estimatedKeys(), 1);
+    }
+
     private void assertIndexQueryWorks(ColumnFamilyStore indexedCFS) throws IOException
     {
         assert "Indexed1".equals(indexedCFS.name);


[2/2] git commit: allocate fixed index summary memory pool and resample cold index summaries to use less memory patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-5519

Posted by jb...@apache.org.
allocate fixed index summary memory pool and resample cold index summaries to use less memory
patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-5519


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dbd1a727
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dbd1a727
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dbd1a727

Branch: refs/heads/trunk
Commit: dbd1a727b7481a3dcd9867e3a6f7791c1095e12a
Parents: 40598ef
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Nov 22 12:09:20 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Nov 22 12:12:55 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 conf/cassandra.yaml                             |  14 +
 .../org/apache/cassandra/config/Config.java     |   3 +
 .../cassandra/config/DatabaseDescriptor.java    |  21 +
 .../org/apache/cassandra/db/DataTracker.java    |  37 +-
 .../compaction/AbstractCompactionStrategy.java  |   2 +-
 .../apache/cassandra/io/sstable/Descriptor.java |   7 +-
 .../cassandra/io/sstable/Downsampling.java      | 150 +++++++
 .../cassandra/io/sstable/IndexSummary.java      | 137 +++++-
 .../io/sstable/IndexSummaryBuilder.java         | 150 ++++++-
 .../io/sstable/IndexSummaryManager.java         | 434 +++++++++++++++++++
 .../io/sstable/IndexSummaryManagerMBean.java    |  48 ++
 .../cassandra/io/sstable/SSTableReader.java     | 236 +++++++---
 .../cassandra/io/sstable/SSTableWriter.java     |   2 +-
 .../cassandra/service/StorageService.java       |   4 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |   3 +-
 .../io/sstable/IndexSummaryManagerTest.java     | 327 ++++++++++++++
 .../cassandra/io/sstable/IndexSummaryTest.java  | 151 ++++++-
 .../cassandra/io/sstable/SSTableReaderTest.java |  79 +++-
 19 files changed, 1694 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 79e5880..f54afe1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.1
+ * allocate fixed index summary memory pool and resample cold index summaries 
+   to use less memory (CASSANDRA-5519)
  * Removed multithreaded compaction (CASSANDRA-6142)
  * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
  * change logging from log4j to logback (CASSANDRA-5883)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 53d091d..180fcd5 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -269,6 +269,20 @@ concurrent_writes: 32
 # the maximum number of secondary indexes created on a single CF.
 memtable_flush_queue_size: 4
 
+# A fixed memory pool size in MB for for SSTable index summaries. If left
+# empty, this will default to 5% of the heap size. If the memory usage of
+# all index summaries exceeds this limit, SSTables with low read rates will
+# shrink their index summaries in order to meet this limit.  However, this
+# is a best-effort process. In extreme conditions Cassandra may need to use
+# more than this amount of memory.
+index_summary_capacity_in_mb:
+
+# How frequently index summaries should be resampled.  This is done
+# periodically to redistribute memory from the fixed-size pool to sstables
+# proportional their recent read rates.  Setting to -1 will disable this
+# process, leaving existing index summaries at their current sampling level.
+index_summary_resize_interval_in_minutes: 60
+
 # Whether to, when doing sequential writing, fsync() at intervals in
 # order to force the operating system to flush the dirty
 # buffers. Enable this to avoid sudden dirty buffer flushing from

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index f41a112..c48d652 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -183,6 +183,9 @@ public class Config
     public volatile int tombstone_warn_threshold = 1000;
     public volatile int tombstone_failure_threshold = 100000;
 
+    public volatile Long index_summary_capacity_in_mb;
+    public volatile int index_summary_resize_interval_in_minutes = 60;
+
     public static boolean getOutboundBindAny()
     {
         return outboundBindAny;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index d412032..3a4ab62 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.IndexSummaryManager;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.IAllocator;
 import org.apache.cassandra.locator.DynamicEndpointSnitch;
@@ -86,6 +87,7 @@ public class DatabaseDescriptor
 
     private static long keyCacheSizeInMB;
     private static IAllocator memoryAllocator;
+    private static long indexSummaryCapacityInMB;
 
     private static String localDC;
     private static Comparator<InetAddress> localComparator;
@@ -449,6 +451,15 @@ public class DatabaseDescriptor
                     + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.");
         }
 
+        // if set to empty/"auto" then use 5% of Heap size
+        indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
+            ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
+            : conf.index_summary_capacity_in_mb;
+
+        if (indexSummaryCapacityInMB < 0)
+            throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
+                    + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.");
+
         memoryAllocator = FBUtilities.newOffHeapAllocator(conf.memory_allocator);
 
         if(conf.encryption_options != null)
@@ -1221,6 +1232,11 @@ public class DatabaseDescriptor
         return keyCacheSizeInMB;
     }
 
+    public static long getIndexSummaryCapacityInMB()
+    {
+        return indexSummaryCapacityInMB;
+    }
+
     public static int getKeyCacheSavePeriod()
     {
         return conf.key_cache_save_period;
@@ -1312,4 +1328,9 @@ public class DatabaseDescriptor
             throw new RuntimeException(e);
         }
     }
+
+    public static int getIndexSummaryResizeIntervalInMinutes()
+    {
+        return conf.index_summary_resize_interval_in_minutes;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 1c25f44..64b088d 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -328,6 +328,41 @@ public class DataTracker
                           SSTableIntervalTree.empty()));
     }
 
+    /**
+     * A special kind of replacement for SSTableReaders that were cloned with a new index summary sampling level (see
+     * SSTableReader.cloneWithNewSummarySamplingLevel and CASSANDRA-5519).  This does not mark the old reader
+     * as compacted.
+     * @param oldSSTables replaced readers
+     * @param newSSTables replacement readers
+     */
+    public void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables)
+    {
+        // data component will be unchanged but the index summary will be a different size
+        // (since we save that to make restart fast)
+        long sizeIncrease = 0;
+        for (SSTableReader sstable : oldSSTables)
+            sizeIncrease -= sstable.bytesOnDisk();
+        for (SSTableReader sstable : newSSTables)
+            sizeIncrease += sstable.bytesOnDisk();
+
+        View currentView, newView;
+        do
+        {
+            currentView = view.get();
+            newView = currentView.replace(oldSSTables, newSSTables);
+        }
+        while (!view.compareAndSet(currentView, newView));
+
+        StorageMetrics.load.inc(sizeIncrease);
+        cfstore.metric.liveDiskSpaceUsed.inc(sizeIncrease);
+
+        for (SSTableReader sstable : newSSTables)
+            sstable.setTrackedBy(this);
+
+        for (SSTableReader sstable : oldSSTables)
+            sstable.releaseReference();
+    }
+
     private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
     {
         if (!cfstore.isValid())
@@ -595,4 +630,4 @@ public class DataTracker
             return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", memtablesPendingFlush.size(), sstables, compacting);
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index b63caab..dac3d47 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -283,7 +283,7 @@ public abstract class AbstractCompactionStrategy
         else
         {
             // what percentage of columns do we expect to compact outside of overlap?
-            if (sstable.getKeySampleSize() < 2)
+            if (sstable.getIndexSummarySize() < 2)
             {
                 // we have too few samples to estimate correct percentage
                 return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 1b29c1c..fef6a1e 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -43,8 +43,8 @@ public class Descriptor
     // we always incremented the major version.
     public static class Version
     {
-        // This needs to be at the begining for initialization sake
-        public static final String current_version = "jb";
+        // This needs to be at the beginning for initialization sake
+        public static final String current_version = "jc";
 
         // ic (1.2.5): omits per-row bloom filter of column names
         // ja (2.0.0): super columns are serialized as composites (note that there is no real format change,
@@ -57,6 +57,7 @@ public class Descriptor
         //             tracks max/min column values (according to comparator)
         // jb (2.0.1): switch from crc32 to adler32 for compression checksums
         //             checksum the compressed data
+        // jc (2.1.0): index summaries can be downsampled and the sampling level is persisted
 
         public static final Version CURRENT = new Version(current_version);
 
@@ -70,6 +71,7 @@ public class Descriptor
         public final boolean hasRowSizeAndColumnCount;
         public final boolean tracksMaxMinColumnNames;
         public final boolean hasPostCompressionAdlerChecksums;
+        public final boolean hasSamplingLevel;
 
         public Version(String version)
         {
@@ -82,6 +84,7 @@ public class Descriptor
             hasRowSizeAndColumnCount = version.compareTo("ja") < 0;
             tracksMaxMinColumnNames = version.compareTo("ja") >= 0;
             hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0;
+            hasSamplingLevel = version.compareTo("jc") >= 0;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/Downsampling.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Downsampling.java b/src/java/org/apache/cassandra/io/sstable/Downsampling.java
new file mode 100644
index 0000000..62ca1be
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/Downsampling.java
@@ -0,0 +1,150 @@
+package org.apache.cassandra.io.sstable;
+
+import java.util.*;
+
+public class Downsampling
+{
+    /**
+     * The base (down)sampling level determines the granularity at which we can down/upsample.
+     *
+     * A higher number allows us to approximate more closely the ideal sampling.  (It could also mean we do a lot of
+     * expensive almost-no-op resamplings from N to N-1, but the thresholds in IndexSummaryManager prevent that.)
+     *
+     * BSL must be a power of two in order to have good sampling patterns. This cannot be changed without rebuilding
+     * all index summaries at full sampling; for now we treat it as a constant.
+     */
+    public static final int BASE_SAMPLING_LEVEL = 128;
+
+    /**
+     * The lowest level we will downsample to: the coarsest summary will have (MSL / BSL) entries left.
+     *
+     * This can be anywhere from 1 to the base sampling level.
+     */
+    public static final int MIN_SAMPLING_LEVEL = 8;
+
+    private static final Map<Integer, List<Integer>> samplePatternCache = new HashMap<>();
+
+    private static final Map<Integer, List<Integer>> originalIndexCache = new HashMap<>();
+
+    /**
+     * Gets a list L of starting indices for downsampling rounds: the first round should start with the offset
+     * given by L[0], the second by the offset in L[1], etc.
+     *
+     * @param samplingLevel the base sampling level
+     *
+     * @return A list of `samplingLevel` unique indices between 0 and `samplingLevel`
+     */
+    public static List<Integer> getSamplingPattern(int samplingLevel)
+    {
+        List<Integer> pattern = samplePatternCache.get(samplingLevel);
+        if (pattern != null)
+            return pattern;
+
+        if (samplingLevel <= 1)
+            return Arrays.asList(0);
+
+        ArrayList<Integer> startIndices = new ArrayList<>(samplingLevel);
+        startIndices.add(0);
+
+        int spread = samplingLevel;
+        while (spread >= 2)
+        {
+            ArrayList<Integer> roundIndices = new ArrayList<>(samplingLevel / spread);
+            for (int i = spread / 2; i < samplingLevel; i += spread)
+                roundIndices.add(i);
+
+            // especially for latter rounds, it's important that we spread out the start points, so we'll
+            // make a recursive call to get an ordering for this list of start points
+            List<Integer> roundIndicesOrdering = getSamplingPattern(roundIndices.size());
+            for (int i = 0; i < roundIndices.size(); ++i)
+                startIndices.add(roundIndices.get(roundIndicesOrdering.get(i)));
+
+            spread /= 2;
+        }
+
+        samplePatternCache.put(samplingLevel, startIndices);
+        return startIndices;
+    }
+
+    /**
+     * Returns a list that can be used to translate current index summary indexes to their original index before
+     * downsampling.  (This repeats every `samplingLevel`, so that's how many entries we return.)
+     *
+     * For example, if [7, 15] is returned, the current index summary entry at index 0 was originally
+     * at index 7, and the current index 1 was originally at index 15.
+     *
+     * @param samplingLevel the current sampling level for the index summary
+     *
+     * @return a list of original indexes for current summary entries
+     */
+    public static List<Integer> getOriginalIndexes(int samplingLevel)
+    {
+        List<Integer> originalIndexes = originalIndexCache.get(samplingLevel);
+        if (originalIndexes != null)
+            return originalIndexes;
+
+        List<Integer> pattern = getSamplingPattern(BASE_SAMPLING_LEVEL).subList(0, BASE_SAMPLING_LEVEL - samplingLevel);
+        originalIndexes = new ArrayList<>(samplingLevel);
+        for (int j = 0; j < BASE_SAMPLING_LEVEL; j++)
+        {
+            if (!pattern.contains(j))
+                originalIndexes.add(j);
+        }
+
+        originalIndexCache.put(samplingLevel, originalIndexes);
+        return originalIndexes;
+    }
+
+    /**
+     * Calculates the effective index interval after the entry at `index` in an IndexSummary.  In other words, this
+     * returns the number of partitions in the primary on-disk index before the next partition that has an entry in
+     * the index summary.  If samplingLevel == BASE_SAMPLING_LEVEL, this will be equal to the index interval.
+     * @param index an index into an IndexSummary
+     * @param samplingLevel the current sampling level for that IndexSummary
+     * @param indexInterval the index interval
+     * @return the number of partitions before the next index summary entry, inclusive on one end
+     */
+    public static int getEffectiveIndexIntervalAfterIndex(int index, int samplingLevel, int indexInterval)
+    {
+        assert index >= -1;
+        List<Integer> originalIndexes = getOriginalIndexes(samplingLevel);
+        if (index == -1)
+            return originalIndexes.get(0) * indexInterval;
+
+        index %= samplingLevel;
+        if (index == originalIndexes.size() - 1)
+        {
+            // account for partitions after the "last" entry as well as partitions before the "first" entry
+            return ((BASE_SAMPLING_LEVEL - originalIndexes.get(index)) + originalIndexes.get(0)) * indexInterval;
+        }
+        else
+        {
+            return (originalIndexes.get(index + 1) - originalIndexes.get(index)) * indexInterval;
+        }
+    }
+
+    public static int[] getStartPoints(int currentSamplingLevel, int newSamplingLevel)
+    {
+        List<Integer> allStartPoints = getSamplingPattern(BASE_SAMPLING_LEVEL);
+
+        // calculate starting indexes for sampling rounds
+        int initialRound = BASE_SAMPLING_LEVEL - currentSamplingLevel;
+        int numRounds = Math.abs(currentSamplingLevel - newSamplingLevel);
+        int[] startPoints = new int[numRounds];
+        for (int i = 0; i < numRounds; ++i)
+        {
+            int start = allStartPoints.get(initialRound + i);
+
+            // our "ideal" start points will be affected by the removal of items in earlier rounds, so go through all
+            // earlier rounds, and if we see an index that comes before our ideal start point, decrement the start point
+            int adjustment = 0;
+            for (int j = 0; j < initialRound; ++j)
+            {
+                if (allStartPoints.get(j) < start)
+                    adjustment++;
+            }
+            startPoints[i] = start - adjustment;
+        }
+        return startPoints;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index be7977e..4fc4737 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -23,6 +23,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.IPartitioner;
@@ -31,27 +34,52 @@ import org.apache.cassandra.io.util.MemoryInputStream;
 import org.apache.cassandra.io.util.MemoryOutputStream;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+
+/*
+ * Layout of Memory for index summaries:
+ *
+ * There are two sections:
+ *  1. A "header" containing the offset into `bytes` of entries in the summary summary data, consisting of
+ *     one four byte position for each entry in the summary.  This allows us do simple math in getIndex()
+ *     to find the position in the Memory to start reading the actual index summary entry.
+ *     (This is necessary because keys can have different lengths.)
+ *  2.  A sequence of (DecoratedKey, position) pairs, where position is the offset into the actual index file.
+ */
 public class IndexSummary implements Closeable
 {
+    private static final Logger logger = LoggerFactory.getLogger(IndexSummary.class);
+
     public static final IndexSummarySerializer serializer = new IndexSummarySerializer();
     private final int indexInterval;
     private final IPartitioner partitioner;
-    private final int summary_size;
+    private final int summarySize;
+    private final int sizeAtFullSampling;
     private final Memory bytes;
 
-    public IndexSummary(IPartitioner partitioner, Memory memory, int summary_size, int indexInterval)
+    /**
+     * A value between MIN_SAMPLING_LEVEL and BASE_SAMPLING_LEVEL that represents how many of the original
+     * index summary entries ((1 / indexInterval) * numKeys) have been retained.
+     *
+     * Thus, this summary contains (samplingLevel / BASE_SAMPLING_LEVEL) * ((1 / indexInterval) * numKeys)) entries.
+     */
+    private final int samplingLevel;
+
+    public IndexSummary(IPartitioner partitioner, Memory memory, int summarySize, int sizeAtFullSampling, int indexInterval, int samplingLevel)
     {
         this.partitioner = partitioner;
         this.indexInterval = indexInterval;
-        this.summary_size = summary_size;
+        this.summarySize = summarySize;
+        this.sizeAtFullSampling = sizeAtFullSampling;
         this.bytes = memory;
+        this.samplingLevel = samplingLevel;
     }
 
     // binary search is notoriously more difficult to get right than it looks; this is lifted from
     // Harmony's Collections implementation
     public int binarySearch(RowPosition key)
     {
-        int low = 0, mid = summary_size, high = mid - 1, result = -1;
+        int low = 0, mid = summarySize, high = mid - 1, result = -1;
         while (low <= high)
         {
             mid = (low + high) >> 1;
@@ -73,16 +101,21 @@ public class IndexSummary implements Closeable
         return -mid - (result < 0 ? 1 : 2);
     }
 
-    public int getIndex(int index)
+    /**
+     * Gets the position of the actual index summary entry in our Memory attribute, 'bytes'.
+     * @param index The index of the entry or key to get the position for
+     * @return an offset into our Memory attribute where the actual entry resides
+     */
+    public int getPositionInSummary(int index)
     {
-        // multiply by 4.
+        // The first section of bytes holds a four-byte position for each entry in the summary, so just multiply by 4.
         return bytes.getInt(index << 2);
     }
 
     public byte[] getKey(int index)
     {
-        long start = getIndex(index);
-        int keySize = (int) (caclculateEnd(index) - start - 8L);
+        long start = getPositionInSummary(index);
+        int keySize = (int) (calculateEnd(index) - start - 8L);
         byte[] key = new byte[keySize];
         bytes.getBytes(start, key, 0, keySize);
         return key;
@@ -90,12 +123,21 @@ public class IndexSummary implements Closeable
 
     public long getPosition(int index)
     {
-        return bytes.getLong(caclculateEnd(index) - 8);
+        return bytes.getLong(calculateEnd(index) - 8);
     }
 
-    private long caclculateEnd(int index)
+    public byte[] getEntry(int index)
     {
-        return index == (summary_size - 1) ? bytes.size() : getIndex(index + 1);
+        long start = getPositionInSummary(index);
+        long end = calculateEnd(index);
+        byte[] entry = new byte[(int)(end - start)];
+        bytes.getBytes(start, entry, 0, (int)(end - start));
+        return entry;
+    }
+
+    private long calculateEnd(int index)
+    {
+        return index == (summarySize - 1) ? bytes.size() : getPositionInSummary(index + 1);
     }
 
     public int getIndexInterval()
@@ -105,27 +147,80 @@ public class IndexSummary implements Closeable
 
     public int size()
     {
-        return summary_size;
+        return summarySize;
+    }
+
+    public int getSamplingLevel()
+    {
+        return samplingLevel;
+    }
+
+    /**
+     * Returns the number of entries this summary would have if it were at the full sampling level, which is equal
+     * to the number of entries in the primary on-disk index divided by the index interval.
+     */
+    public int getMaxNumberOfEntries()
+    {
+        return sizeAtFullSampling;
+    }
+
+    /**
+     * Returns the amount of off-heap memory used for this summary.
+     * @return size in bytes
+     */
+    public long getOffHeapSize()
+    {
+        return bytes.size();
+    }
+
+    /**
+     * Returns the number of primary (on-disk) index entries between the index summary entry at `index` and the next
+     * index summary entry (assuming there is one).  Without any downsampling, this will always be equivalent to
+     * the index interval.
+     *
+     * @param index the index of an index summary entry (between zero and the index entry size)
+     *
+     * @return the number of partitions after `index` until the next partition with a summary entry
+     */
+    public int getEffectiveIndexIntervalAfterIndex(int index)
+    {
+        return Downsampling.getEffectiveIndexIntervalAfterIndex(index, samplingLevel, indexInterval);
     }
 
     public static class IndexSummarySerializer
     {
-        public void serialize(IndexSummary t, DataOutputStream out) throws IOException
+        public void serialize(IndexSummary t, DataOutputStream out, boolean withSamplingLevel) throws IOException
         {
             out.writeInt(t.indexInterval);
-            out.writeInt(t.summary_size);
+            out.writeInt(t.summarySize);
             out.writeLong(t.bytes.size());
+            if (withSamplingLevel)
+            {
+                out.writeInt(t.samplingLevel);
+                out.writeInt(t.sizeAtFullSampling);
+            }
             FBUtilities.copy(new MemoryInputStream(t.bytes), out, t.bytes.size());
         }
 
-        public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner) throws IOException
+        public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel) throws IOException
         {
             int indexInterval = in.readInt();
-            int summary_size = in.readInt();
-            long offheap_size = in.readLong();
-            Memory memory = Memory.allocate(offheap_size);
-            FBUtilities.copy(in, new MemoryOutputStream(memory), offheap_size);
-            return new IndexSummary(partitioner, memory, summary_size, indexInterval);
+            int summarySize = in.readInt();
+            long offheapSize = in.readLong();
+            int samplingLevel, fullSamplingSummarySize;
+            if (haveSamplingLevel)
+            {
+                samplingLevel = in.readInt();
+                fullSamplingSummarySize = in.readInt();
+            }
+            else
+            {
+                samplingLevel = BASE_SAMPLING_LEVEL;
+                fullSamplingSummarySize = summarySize;
+            }
+            Memory memory = Memory.allocate(offheapSize);
+            FBUtilities.copy(in, new MemoryOutputStream(memory), offheapSize);
+            return new IndexSummary(partitioner, memory, summarySize, fullSamplingSummarySize, indexInterval, samplingLevel);
         }
     }
 
@@ -134,4 +229,4 @@ public class IndexSummary implements Closeable
     {
         bytes.free();
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index e7b9e11..3635e7e 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.util.ArrayList;
+import java.util.*;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.TypeSizes;
@@ -27,6 +27,9 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+import static org.apache.cassandra.io.sstable.Downsampling.MIN_SAMPLING_LEVEL;
+
 public class IndexSummaryBuilder
 {
     private static final Logger logger = LoggerFactory.getLogger(IndexSummaryBuilder.class);
@@ -34,12 +37,18 @@ public class IndexSummaryBuilder
     private final ArrayList<Long> positions;
     private final ArrayList<byte[]> keys;
     private final int indexInterval;
+    private final int samplingLevel;
+    private final int[] startPoints;
     private long keysWritten = 0;
+    private long indexIntervalMatches = 0;
     private long offheapSize = 0;
 
-    public IndexSummaryBuilder(long expectedKeys, int indexInterval)
+    public IndexSummaryBuilder(long expectedKeys, int indexInterval, int samplingLevel)
     {
         this.indexInterval = indexInterval;
+        this.samplingLevel = samplingLevel;
+        this.startPoints = Downsampling.getStartPoints(BASE_SAMPLING_LEVEL, samplingLevel);
+
         long expectedEntries = expectedKeys / indexInterval;
         if (expectedEntries > Integer.MAX_VALUE)
         {
@@ -50,19 +59,39 @@ public class IndexSummaryBuilder
             logger.warn("Index interval of {} is too low for {} expected keys; using interval of {} instead",
                         indexInterval, expectedKeys, effectiveInterval);
         }
-        positions = new ArrayList<Long>((int)expectedEntries);
-        keys = new ArrayList<byte[]>((int)expectedEntries);
+
+        // adjust our estimates based on the sampling level
+        expectedEntries = (expectedEntries * samplingLevel) / BASE_SAMPLING_LEVEL;
+
+        positions = new ArrayList<>((int)expectedEntries);
+        keys = new ArrayList<>((int)expectedEntries);
     }
 
     public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexPosition)
     {
         if (keysWritten % indexInterval == 0)
         {
-            byte[] key = ByteBufferUtil.getArray(decoratedKey.key);
-            keys.add(key);
-            offheapSize += key.length;
-            positions.add(indexPosition);
-            offheapSize += TypeSizes.NATIVE.sizeof(indexPosition);
+            indexIntervalMatches++;
+
+            // see if we should skip this key based on our sampling level
+            boolean shouldSkip = false;
+            for (int start : startPoints)
+            {
+                if ((indexIntervalMatches - start) % BASE_SAMPLING_LEVEL == 0)
+                {
+                    shouldSkip = true;
+                    break;
+                }
+            }
+
+            if (!shouldSkip)
+            {
+                byte[] key = ByteBufferUtil.getArray(decoratedKey.key);
+                keys.add(key);
+                offheapSize += key.length;
+                positions.add(indexPosition);
+                offheapSize += TypeSizes.NATIVE.sizeof(indexPosition);
+            }
         }
         keysWritten++;
 
@@ -74,21 +103,110 @@ public class IndexSummaryBuilder
         assert keys.size() > 0;
         assert keys.size() == positions.size();
 
+        // first we write out the position in the *summary* for each key in the summary,
+        // then we write out (key, actual index position) pairs
         Memory memory = Memory.allocate(offheapSize + (keys.size() * 4));
         int idxPosition = 0;
         int keyPosition = keys.size() * 4;
         for (int i = 0; i < keys.size(); i++)
         {
+            // write the position of the actual entry in the index summary (4 bytes)
+            memory.setInt(idxPosition, keyPosition);
+            idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
+
+            // write the key
+            byte[] keyBytes = keys.get(i);
+            memory.setBytes(keyPosition, keyBytes, 0, keyBytes.length);
+            keyPosition += keyBytes.length;
+
+            // write the position in the actual index file
+            long actualIndexPosition = positions.get(i);
+            memory.setLong(keyPosition, actualIndexPosition);
+            keyPosition += TypeSizes.NATIVE.sizeof(actualIndexPosition);
+        }
+        int sizeAtFullSampling = (int) Math.ceil(keysWritten / (double) indexInterval);
+        return new IndexSummary(partitioner, memory, keys.size(), sizeAtFullSampling, indexInterval, samplingLevel);
+    }
+
+    public static int entriesAtSamplingLevel(int samplingLevel, int maxSummarySize)
+    {
+        return (samplingLevel * maxSummarySize) / BASE_SAMPLING_LEVEL;
+    }
+
+    public static int calculateSamplingLevel(int currentSamplingLevel, int currentNumEntries, long targetNumEntries)
+    {
+        // Algebraic explanation for calculating the new sampling level (solve for newSamplingLevel):
+        // originalNumEntries = (baseSamplingLevel / currentSamplingLevel) * currentNumEntries
+        // newSpaceUsed = (newSamplingLevel / baseSamplingLevel) * originalNumEntries
+        // newSpaceUsed = (newSamplingLevel / baseSamplingLevel) * (baseSamplingLevel / currentSamplingLevel) * currentNumEntries
+        // newSpaceUsed = (newSamplingLevel / currentSamplingLevel) * currentNumEntries
+        // (newSpaceUsed * currentSamplingLevel) / currentNumEntries = newSamplingLevel
+        int newSamplingLevel = (int) (targetNumEntries * currentSamplingLevel) / currentNumEntries;
+        return Math.min(BASE_SAMPLING_LEVEL, Math.max(MIN_SAMPLING_LEVEL, newSamplingLevel));
+    }
+
+    /**
+     * Downsamples an existing index summary to a new sampling level.
+     * @param existing an existing IndexSummary
+     * @param newSamplingLevel the target level for the new IndexSummary.  This must be less than the current sampling
+     *                         level for `existing`.
+     * @param partitioner the partitioner used for the index summary
+     * @return a new IndexSummary
+     */
+    public static IndexSummary downsample(IndexSummary existing, int newSamplingLevel, IPartitioner partitioner)
+    {
+        // To downsample the old index summary, we'll go through (potentially) several rounds of downsampling.
+        // Conceptually, each round starts at position X and then removes every Nth item.  The value of X follows
+        // a particular pattern to evenly space out the items that we remove.  The value of N decreases by one each
+        // round.
+
+        int currentSamplingLevel = existing.getSamplingLevel();
+        assert currentSamplingLevel > newSamplingLevel;
+
+        // calculate starting indexes for downsampling rounds
+        int[] startPoints = Downsampling.getStartPoints(currentSamplingLevel, newSamplingLevel);
+
+        // calculate new off-heap size
+        int removedKeyCount = 0;
+        long newOffHeapSize = existing.getOffHeapSize();
+        for (int start : startPoints)
+        {
+            for (int j = start; j < existing.size(); j += currentSamplingLevel)
+            {
+                removedKeyCount++;
+                newOffHeapSize -= existing.getEntry(j).length;
+            }
+        }
+
+        int newKeyCount = existing.size() - removedKeyCount;
+
+        // Subtract (removedKeyCount * 4) from the new size to account for fewer entries in the first section, which
+        // stores the position of the actual entries in the summary.
+        Memory memory = Memory.allocate(newOffHeapSize - (removedKeyCount * 4));
+
+        // Copy old entries to our new Memory.
+        int idxPosition = 0;
+        int keyPosition = newKeyCount * 4;
+        outer:
+        for (int oldSummaryIndex = 0; oldSummaryIndex < existing.size(); oldSummaryIndex++)
+        {
+            // to determine if we can skip this entry, go through the starting points for our downsampling rounds
+            // and see if the entry's index is covered by that round
+            for (int start : startPoints)
+            {
+                if ((oldSummaryIndex - start) % currentSamplingLevel == 0)
+                    continue outer;
+            }
+
+            // write the position of the actual entry in the index summary (4 bytes)
             memory.setInt(idxPosition, keyPosition);
             idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
 
-            byte[] temp = keys.get(i);
-            memory.setBytes(keyPosition, temp, 0, temp.length);
-            keyPosition += temp.length;
-            long tempPosition = positions.get(i);
-            memory.setLong(keyPosition, tempPosition);
-            keyPosition += TypeSizes.NATIVE.sizeof(tempPosition);
+            // write the entry itself
+            byte[] entry = existing.getEntry(oldSummaryIndex);
+            memory.setBytes(keyPosition, entry, 0, entry.length);
+            keyPosition += entry.length;
         }
-        return new IndexSummary(partitioner, memory, keys.size(), indexInterval);
+        return new IndexSummary(partitioner, memory, newKeyCount, existing.getMaxNumberOfEntries(), existing.getIndexInterval(), newSamplingLevel);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
new file mode 100644
index 0000000..1b36baf
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.*;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+/**
+ * Manages the fixed-size memory pool for index summaries, periodically resizing them
+ * in order to give more memory to hot sstables and less memory to cold sstables.
+ */
+public class IndexSummaryManager implements IndexSummaryManagerMBean
+{
+    private static final Logger logger = LoggerFactory.getLogger(IndexSummaryManager.class);
+    public static final String MBEAN_NAME = "org.apache.cassandra.db:type=IndexSummaries";
+    public static final IndexSummaryManager instance;
+
+    private int resizeIntervalInMinutes = 0;
+    private long memoryPoolBytes;
+
+    // The target (or ideal) number of index summary entries must differ from the actual number of
+    // entries by this ratio in order to trigger an upsample or downsample of the summary.  Because
+    // upsampling requires reading the primary index in order to rebuild the summary, the threshold
+    // for upsampling is is higher.
+    static final double UPSAMPLE_THRESHOLD = 1.5;
+    static final double DOWNSAMPLE_THESHOLD = 0.75;
+
+    private final DebuggableScheduledThreadPoolExecutor executor;
+
+    // our next scheduled resizing run
+    private ScheduledFuture future;
+
+    static
+    {
+        instance = new IndexSummaryManager();
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+
+        try
+        {
+            mbs.registerMBean(instance, new ObjectName(MBEAN_NAME));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private IndexSummaryManager()
+    {
+        executor = new DebuggableScheduledThreadPoolExecutor(1, "IndexSummaryManager", Thread.MIN_PRIORITY);
+
+        long indexSummarySizeInMB = DatabaseDescriptor.getIndexSummaryCapacityInMB();
+        int interval = DatabaseDescriptor.getIndexSummaryResizeIntervalInMinutes();
+        logger.info("Initializing index summary manager with a memory pool size of {} MB and a resize interval of {} minutes",
+                    indexSummarySizeInMB, interval);
+
+        setMemoryPoolCapacityInMB(DatabaseDescriptor.getIndexSummaryCapacityInMB());
+        setResizeIntervalInMinutes(DatabaseDescriptor.getIndexSummaryResizeIntervalInMinutes());
+    }
+
+    public int getResizeIntervalInMinutes()
+    {
+        return resizeIntervalInMinutes;
+    }
+
+    public void setResizeIntervalInMinutes(int resizeIntervalInMinutes)
+    {
+        int oldInterval = this.resizeIntervalInMinutes;
+        this.resizeIntervalInMinutes = resizeIntervalInMinutes;
+
+        long initialDelay;
+        if (future != null)
+        {
+            initialDelay = oldInterval < 0
+                           ? resizeIntervalInMinutes
+                           : Math.max(0, resizeIntervalInMinutes - (oldInterval - future.getDelay(TimeUnit.MINUTES)));
+            future.cancel(false);
+        }
+        else
+        {
+            initialDelay = resizeIntervalInMinutes;
+        }
+
+        if (this.resizeIntervalInMinutes < 0)
+        {
+            future = null;
+            return;
+        }
+
+        future = executor.scheduleWithFixedDelay(new WrappedRunnable()
+        {
+            protected void runMayThrow() throws Exception
+            {
+                redistributeSummaries();
+            }
+        }, initialDelay, resizeIntervalInMinutes, TimeUnit.MINUTES);
+    }
+
+    // for testing only
+    @VisibleForTesting
+    Long getTimeToNextResize(TimeUnit timeUnit)
+    {
+        if (future == null)
+            return null;
+
+        return future.getDelay(timeUnit);
+    }
+
+    public long getMemoryPoolCapacityInMB()
+    {
+        return memoryPoolBytes / 1024L / 1024L;
+    }
+
+    public Map<String, Double> getSamplingRatios()
+    {
+        List<SSTableReader> sstables = getAllSSTables();
+        Map<String, Double> ratios = new HashMap<>(sstables.size());
+        for (SSTableReader sstable : sstables)
+            ratios.put(sstable.getFilename(), sstable.getIndexSummarySamplingLevel() / (double) Downsampling.BASE_SAMPLING_LEVEL);
+
+        return ratios;
+    }
+
+    public double getAverageSamplingRatio()
+    {
+        List<SSTableReader> sstables = getAllSSTables();
+        double total = 0.0;
+        for (SSTableReader sstable : sstables)
+            total += sstable.getIndexSummarySamplingLevel() / (double) Downsampling.BASE_SAMPLING_LEVEL;
+        return total / sstables.size();
+    }
+
+    public void setMemoryPoolCapacityInMB(long memoryPoolCapacityInMB)
+    {
+        this.memoryPoolBytes = memoryPoolCapacityInMB * 1024L * 1024L;
+    }
+
+    /**
+     * Returns the actual space consumed by index summaries for all sstables.
+     * @return space currently used in MB
+     */
+    public double getMemoryPoolSizeInMB()
+    {
+        long total = 0;
+        for (SSTableReader sstable : getAllSSTables())
+            total += sstable.getIndexSummaryOffHeapSize();
+        return total / 1024.0 / 1024.0;
+    }
+
+    private List<SSTableReader> getAllSSTables()
+    {
+        List<SSTableReader> result = new ArrayList<>();
+        for (Keyspace ks : Keyspace.all())
+        {
+            for (ColumnFamilyStore cfStore: ks.getColumnFamilyStores())
+                result.addAll(cfStore.getSSTables());
+        }
+
+        return result;
+    }
+
+    /**
+     * Returns a Pair of all compacting and non-compacting sstables.  Non-compacting sstables will be marked as
+     * compacting.
+     */
+    private Pair<List<SSTableReader>, Multimap<DataTracker, SSTableReader>> getCompactingAndNonCompactingSSTables()
+    {
+        List<SSTableReader> allCompacting = new ArrayList<>();
+        Multimap<DataTracker, SSTableReader> allNonCompacting = HashMultimap.create();
+        for (Keyspace ks : Keyspace.all())
+        {
+            for (ColumnFamilyStore cfStore: ks.getColumnFamilyStores())
+            {
+                Set<SSTableReader> nonCompacting, allSSTables;
+                do
+                {
+                    allSSTables = cfStore.getDataTracker().getSSTables();
+                    nonCompacting = Sets.newHashSet(cfStore.getDataTracker().getUncompactingSSTables(allSSTables));
+                }
+                while (!(nonCompacting.isEmpty() || cfStore.getDataTracker().markCompacting(nonCompacting)));
+                allNonCompacting.putAll(cfStore.getDataTracker(), nonCompacting);
+                allCompacting.addAll(Sets.difference(allSSTables, nonCompacting));
+            }
+        }
+        return Pair.create(allCompacting, allNonCompacting);
+    }
+
+    public void redistributeSummaries() throws IOException
+    {
+        Pair<List<SSTableReader>, Multimap<DataTracker, SSTableReader>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
+        try
+        {
+            redistributeSummaries(compactingAndNonCompacting.left, Lists.newArrayList(compactingAndNonCompacting.right.values()), this.memoryPoolBytes);
+        }
+        finally
+        {
+            for(DataTracker tracker : compactingAndNonCompacting.right.keySet())
+                tracker.unmarkCompacting(compactingAndNonCompacting.right.get(tracker));
+        }
+    }
+
+    /**
+     * Attempts to fairly distribute a fixed pool of memory for index summaries across a set of SSTables based on
+     * their recent read rates.
+     * @param nonCompacting a list of sstables to share the memory pool across
+     * @param memoryPoolBytes a size (in bytes) that the total index summary space usage should stay close to or
+     *                        under, if possible
+     * @return a list of new SSTableReader instances
+     */
+    @VisibleForTesting
+    public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes) throws IOException
+    {
+        long total = 0;
+        for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting))
+            total += sstable.getIndexSummaryOffHeapSize();
+
+        logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
+                     nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
+
+        double totalReadsPerSec = 0.0;
+        for (SSTableReader sstable : nonCompacting)
+        {
+            if (sstable.readMeter != null)
+            {
+                totalReadsPerSec += sstable.readMeter.fifteenMinuteRate();
+            }
+        }
+        logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
+
+        // copy and sort by read rates (ascending)
+        List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting);
+        Collections.sort(sstablesByHotness, new Comparator<SSTableReader>()
+        {
+            public int compare(SSTableReader o1, SSTableReader o2)
+            {
+                if (o1.readMeter == null && o2.readMeter == null)
+                    return 0;
+                else if (o1.readMeter == null)
+                    return -1;
+                else if (o2.readMeter == null)
+                    return 1;
+                else
+                    return Double.compare(o1.readMeter.fifteenMinuteRate(), o2.readMeter.fifteenMinuteRate());
+            }
+        });
+
+        long remainingBytes = memoryPoolBytes;
+        for (SSTableReader sstable : compacting)
+            remainingBytes -= sstable.getIndexSummaryOffHeapSize();
+
+        logger.trace("Index summaries for compacting SSTables are using {} MB of space",
+                     (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
+        List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes);
+
+        total = 0;
+        for (SSTableReader sstable : Iterables.concat(compacting, newSSTables))
+            total += sstable.getIndexSummaryOffHeapSize();
+        logger.debug("Completed resizing of index summaries; current approximate memory used: {} MB",
+                     total / 1024.0 / 1024.0);
+
+        return newSSTables;
+    }
+
+    private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables,
+                                                            double totalReadsPerSec, long memoryPoolCapacity) throws IOException
+    {
+
+        List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4);
+        List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
+        List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
+
+        // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional
+        // to the number of total reads/sec it handles.
+        long remainingSpace = memoryPoolCapacity;
+        for (SSTableReader sstable : sstables)
+        {
+            double readsPerSec = sstable.readMeter == null ? 0.0 : sstable.readMeter.fifteenMinuteRate();
+            long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
+
+            // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that
+            int currentNumEntries = sstable.getIndexSummarySize();
+            double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
+            long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
+            int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
+            int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries);
+            int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize());
+
+            logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving from level " +
+                         "{} ({} entries) to level {} ({} entries)",
+                         sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries, newSamplingLevel, numEntriesAtNewSamplingLevel);
+
+            if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
+            {
+                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+            }
+            else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
+            {
+                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                remainingSpace -= spaceUsed;
+            }
+            else
+            {
+                // keep the same sampling level
+                logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
+                remainingSpace -= sstable.getIndexSummaryOffHeapSize();
+                newSSTables.add(sstable);
+            }
+            totalReadsPerSec -= readsPerSec;
+        }
+
+        if (remainingSpace > 0)
+        {
+            Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace);
+            toDownsample = result.right;
+            newSSTables.addAll(result.left);
+        }
+
+        // downsample first, then upsample
+        toDownsample.addAll(toUpsample);
+        Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create();
+        Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create();
+        for (ResampleEntry entry : toDownsample)
+        {
+            SSTableReader sstable = entry.sstable;
+            logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
+                         sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
+                         entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
+            SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(entry.newSamplingLevel);
+            DataTracker tracker = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName()).getDataTracker();
+
+            replacedByTracker.put(tracker, sstable);
+            replacementsByTracker.put(tracker, replacement);
+        }
+
+        for (DataTracker tracker : replacedByTracker.keySet())
+        {
+            tracker.replaceReaders(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
+            newSSTables.addAll(replacementsByTracker.get(tracker));
+        }
+
+        return newSSTables;
+    }
+
+    @VisibleForTesting
+    static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
+    {
+        // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations
+        // that will make little difference.
+        Collections.sort(toDownsample, new Comparator<ResampleEntry>()
+        {
+            public int compare(ResampleEntry o1, ResampleEntry o2)
+            {
+                return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
+                                      o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
+            }
+        });
+
+        int noDownsampleCutoff = 0;
+        List<SSTableReader> willNotDownsample = new ArrayList<>();
+        while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
+        {
+            ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
+
+            long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
+            // see if we have enough leftover space to keep the current sampling level
+            if (extraSpaceRequired <= remainingSpace)
+            {
+                logger.trace("Using leftover space to keep {} at the current sampling level ({})",
+                             entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
+                willNotDownsample.add(entry.sstable);
+                remainingSpace -= extraSpaceRequired;
+            }
+            else
+            {
+                break;
+            }
+
+            noDownsampleCutoff++;
+        }
+        return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
+    }
+
+    private static class ResampleEntry
+    {
+        public final SSTableReader sstable;
+        public final long newSpaceUsed;
+        public final int newSamplingLevel;
+
+        public ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
+        {
+            this.sstable = sstable;
+            this.newSpaceUsed = newSpaceUsed;
+            this.newSamplingLevel = newSamplingLevel;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/IndexSummaryManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManagerMBean.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManagerMBean.java
new file mode 100644
index 0000000..1e115cb
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManagerMBean.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface IndexSummaryManagerMBean
+{
+    public long getMemoryPoolCapacityInMB();
+    public void setMemoryPoolCapacityInMB(long memoryPoolCapacityInMB);
+
+    /**
+     * Returns the current actual off-heap memory usage of the index summaries for all non-compacting sstables.
+     * @return The amount of memory used in MB.
+     */
+    public double getMemoryPoolSizeInMB();
+
+    /**
+     * Returns a map of SSTable filenames to their current sampling ratio, where 1.0 indicates that all of the
+     * original index summary entries have been retained and 0.5 indicates that half of the original entries have
+     * been discarded.
+     * @return A map of SSTable filenames to their sampling ratios.
+     */
+    public Map<String, Double> getSamplingRatios();
+
+    public double getAverageSamplingRatio();
+
+    public void redistributeSummaries() throws IOException;
+
+    public int getResizeIntervalInMinutes();
+    public void setResizeIntervalInMinutes(int resizeIntervalInMinutes);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 369e3ea..f82af1f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -119,6 +119,8 @@ public class SSTableReader extends SSTable implements Closeable
     // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
     private final AtomicBoolean isCompacted = new AtomicBoolean(false);
     private final AtomicBoolean isSuspect = new AtomicBoolean(false);
+    private final AtomicBoolean isReplaced = new AtomicBoolean(false);
+
     private final SSTableDeletingTask deletingTask;
     // not final since we need to be able to change level on a file.
     private volatile SSTableMetadata sstableMetadata;
@@ -136,8 +138,8 @@ public class SSTableReader extends SSTable implements Closeable
 
         for (SSTableReader sstable : sstables)
         {
-            int indexKeyCount = sstable.getKeySampleSize();
-            count = count + (indexKeyCount + 1) * sstable.indexSummary.getIndexInterval();
+            // using getMaxIndexSummarySize() lets us ignore the current sampling level
+            count += (sstable.getMaxIndexSummarySize() + 1) * sstable.indexSummary.getSamplingLevel();
             if (logger.isDebugEnabled())
                 logger.debug("index size for bloom filter calc for file  : {}  : {}", sstable.getFilename(), count);
         }
@@ -192,7 +194,7 @@ public class SSTableReader extends SSTable implements Closeable
                                        ? new CompressedSegmentedFile.Builder()
                                        : new BufferedSegmentedFile.Builder();
         if (!sstable.loadSummary(ibuilder, dbuilder, sstable.metadata))
-            sstable.buildSummary(false, ibuilder, dbuilder, false);
+            sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
         sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
         sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
 
@@ -412,11 +414,16 @@ public class SSTableReader extends SSTable implements Closeable
         if (readMeterSyncFuture != null)
             readMeterSyncFuture.cancel(false);
 
-        // Force finalizing mmapping if necessary
-        ifile.cleanup();
-        dfile.cleanup();
-        // close the BF so it can be opened later.
-        bf.close();
+        // if this SSTR was replaced by a new SSTR with a different index summary, the two instances will share
+        // resources, so don't force unmapping, clear the FileCacheService entry, or close the BF
+        if (!isReplaced.get())
+        {
+            // Force finalizing mmapping if necessary
+            ifile.cleanup();
+            dfile.cleanup();
+            // close the BF so it can be opened later.
+            bf.close();
+        }
         indexSummary.close();
     }
 
@@ -483,7 +490,7 @@ public class SSTableReader extends SSTable implements Closeable
 
         boolean summaryLoaded = loadSummary(ibuilder, dbuilder, metadata);
         if (recreateBloomFilter || !summaryLoaded)
-            buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded);
+            buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
 
         ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
         dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
@@ -491,7 +498,7 @@ public class SSTableReader extends SSTable implements Closeable
             saveSummary(ibuilder, dbuilder);
     }
 
-    private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
+    private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
     {
         // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
         RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
@@ -509,7 +516,7 @@ public class SSTableReader extends SSTable implements Closeable
 
             IndexSummaryBuilder summaryBuilder = null;
             if (!summaryLoaded)
-                summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getIndexInterval());
+                summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getIndexInterval(), samplingLevel);
 
             long indexPosition;
             while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
@@ -555,7 +562,7 @@ public class SSTableReader extends SSTable implements Closeable
         try
         {
             iStream = new DataInputStream(new FileInputStream(summariesFile));
-            indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner);
+            indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel);
             if (indexSummary.getIndexInterval() != metadata.getIndexInterval())
             {
                 iStream.close();
@@ -587,15 +594,20 @@ public class SSTableReader extends SSTable implements Closeable
 
     public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
     {
+        saveSummary(ibuilder, dbuilder, indexSummary);
+    }
+
+    private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
+    {
         File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
         if (summariesFile.exists())
-            summariesFile.delete();
+            FileUtils.deleteWithConfirm(summariesFile);
 
         DataOutputStream oStream = null;
         try
         {
             oStream = new DataOutputStream(new FileOutputStream(summariesFile));
-            IndexSummary.serializer.serialize(indexSummary, oStream);
+            IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel);
             ByteBufferUtil.writeWithLength(first.key, oStream);
             ByteBufferUtil.writeWithLength(last.key, oStream);
             ibuilder.serializeBounds(oStream);
@@ -607,7 +619,7 @@ public class SSTableReader extends SSTable implements Closeable
 
             // corrupted hence delete it and let it load it now.
             if (summariesFile.exists())
-                summariesFile.delete();
+                FileUtils.deleteWithConfirm(summariesFile);
         }
         finally
         {
@@ -615,6 +627,81 @@ public class SSTableReader extends SSTable implements Closeable
         }
     }
 
+    /**
+     * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
+     * be built at the target samplingLevel.  This (original) SSTableReader instance will be marked as replaced, have
+     * its DeletingTask removed, and have its periodic read-meter sync task cancelled.
+     * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader
+     * @return a new SSTableReader
+     * @throws IOException
+     */
+    public SSTableReader cloneWithNewSummarySamplingLevel(int samplingLevel) throws IOException
+    {
+        IndexSummary newSummary;
+        if (samplingLevel < indexSummary.getSamplingLevel())
+        {
+            newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, partitioner);
+
+            SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+            SegmentedFile.Builder dbuilder = compression
+                                           ? SegmentedFile.getCompressedBuilder()
+                                           : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+            saveSummary(ibuilder, dbuilder, newSummary);
+        }
+        else if (samplingLevel > indexSummary.getSamplingLevel())
+        {
+            newSummary = upsampleSummary(samplingLevel);
+        }
+        else
+        {
+            throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level");
+        }
+
+        markReplaced();
+        if (readMeterSyncFuture != null)
+            readMeterSyncFuture.cancel(false);
+
+        SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata);
+        replacement.readMeter = this.readMeter;
+        replacement.first = this.first;
+        replacement.last = this.last;
+        return replacement;
+    }
+
+    private IndexSummary upsampleSummary(int newSamplingLevel) throws IOException
+    {
+        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
+        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+        try
+        {
+            long indexSize = primaryIndex.length();
+            IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getIndexInterval(), newSamplingLevel);
+
+            long indexPosition;
+            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+            {
+                summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
+                RowIndexEntry.serializer.skip(primaryIndex);
+            }
+
+            return summaryBuilder.build(partitioner);
+        }
+        finally
+        {
+            FileUtils.closeQuietly(primaryIndex);
+        }
+    }
+
+    public int getIndexSummarySamplingLevel()
+    {
+        return indexSummary.getSamplingLevel();
+    }
+
+    public long getIndexSummaryOffHeapSize()
+    {
+        return indexSummary.getOffHeapSize();
+    }
+
     public void releaseSummary() throws IOException
     {
         indexSummary.close();
@@ -627,22 +714,37 @@ public class SSTableReader extends SSTable implements Closeable
             throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
     }
 
-    /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
+    /**
+     * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away,
+     * modulo downsampling of the index summary).
+     */
     public long getIndexScanPosition(RowPosition key)
     {
-        int index = indexSummary.binarySearch(key);
-        if (index < 0)
+        return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
+    }
+
+    public static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
+    {
+        if (binarySearchResult == -1)
+            return -1;
+        else
+            return referencedIndexSummary.getPosition(getIndexSummaryIndexFromBinarySearchResult(binarySearchResult));
+    }
+
+    private static int getIndexSummaryIndexFromBinarySearchResult(int binarySearchResult)
+    {
+        if (binarySearchResult < 0)
         {
             // binary search gives us the first index _greater_ than the key searched for,
             // i.e., its insertion position
-            int greaterThan = (index + 1) * -1;
+            int greaterThan = (binarySearchResult + 1) * -1;
             if (greaterThan == 0)
                 return -1;
-            return indexSummary.getPosition(greaterThan - 1);
+            return greaterThan - 1;
         }
         else
         {
-            return indexSummary.getPosition(index);
+            return binarySearchResult;
         }
     }
 
@@ -681,7 +783,7 @@ public class SSTableReader extends SSTable implements Closeable
      */
     public long estimatedKeys()
     {
-        return ((long) indexSummary.size()) * indexSummary.getIndexInterval();
+        return ((long) indexSummary.getMaxNumberOfEntries()) * indexSummary.getIndexInterval();
     }
 
     /**
@@ -694,20 +796,35 @@ public class SSTableReader extends SSTable implements Closeable
         List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges);
         for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
             sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1);
-        return Math.max(1, sampleKeyCount * indexSummary.getIndexInterval());
+
+        // adjust for the current sampling level
+        long estimatedKeys = sampleKeyCount * (Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getIndexInterval()) / indexSummary.getSamplingLevel();
+        return Math.max(1, estimatedKeys);
     }
 
     /**
-     * @return Approximately 1/INDEX_INTERVALth of the keys in this SSTable.
+     * Returns the number of entries in the IndexSummary.  At full sampling, this is approximately 1/INDEX_INTERVALth of
+     * the keys in this SSTable.
      */
-    public int getKeySampleSize()
+    public int getIndexSummarySize()
     {
         return indexSummary.size();
     }
 
-    public byte[] getKeySample(int position)
+    /**
+     * Returns the approximate number of entries the IndexSummary would contain if it were at full sampling.
+     */
+    public int getMaxIndexSummarySize()
     {
-        return indexSummary.getKey(position);
+        return indexSummary.getMaxNumberOfEntries();
+    }
+
+    /**
+     * Returns the key for the index summary entry at `index`.
+     */
+    public byte[] getIndexSummaryKey(int index)
+    {
+        return indexSummary.getKey(index);
     }
 
     private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
@@ -935,25 +1052,28 @@ public class SSTableReader extends SSTable implements Closeable
             }
         }
 
-        // next, see if the sampled index says it's impossible for the key to be present
-        long sampledPosition = getIndexScanPosition(key);
-        if (sampledPosition == -1)
+        // check the smallest and greatest keys in the sstable to see if it can't be present
+        if (first.compareTo(key) > 0 || last.compareTo(key) < 0)
         {
             if (op == Operator.EQ && updateCacheAndStats)
                 bloomFilterTracker.addFalsePositive();
-            // we matched the -1th position: if the operator might match forward, we'll start at the first
-            // position. We however need to return the correct index entry for that first position.
-            if (op.apply(1) >= 0)
-            {
-                sampledPosition = 0;
-            }
-            else
+
+            if (op.apply(1) < 0)
             {
-                Tracing.trace("Partition summary allows skipping sstable {}", descriptor.generation);
+                Tracing.trace("Check against min and max keys allows skipping sstable {}", descriptor.generation);
                 return null;
             }
         }
 
+        int binarySearchResult = indexSummary.binarySearch(key);
+        long sampledPosition = getIndexScanPositionFromBinarySearchResult(binarySearchResult, indexSummary);
+        int sampledIndex = getIndexSummaryIndexFromBinarySearchResult(binarySearchResult);
+
+        // if we matched the -1th position, we'll start at the first position
+        sampledPosition = sampledPosition == -1 ? 0 : sampledPosition;
+
+        int effectiveInterval = indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex);
+
         // scan the on-disk index, starting at the nearest sampled position.
         // The check against IndexInterval is to be exit the loop in the EQ case when the key looked for is not present
         // (bloom filter false positive). But note that for non-EQ cases, we might need to check the first key of the
@@ -962,12 +1082,12 @@ public class SSTableReader extends SSTable implements Closeable
         // of the next interval).
         int i = 0;
         Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
-        while (segments.hasNext() && i <= indexSummary.getIndexInterval())
+        while (segments.hasNext() && i <= effectiveInterval)
         {
             FileDataInput in = segments.next();
             try
             {
-                while (!in.isEOF() && i <= indexSummary.getIndexInterval())
+                while (!in.isEOF() && i <= effectiveInterval)
                 {
                     i++;
 
@@ -1102,6 +1222,12 @@ public class SSTableReader extends SSTable implements Closeable
         return dfile.onDiskLength;
     }
 
+    public void markReplaced()
+    {
+        boolean success = isReplaced.compareAndSet(false, true);
+        assert success : "Attempted to mark an SSTableReader as replaced more than once";
+    }
+
     public boolean acquireReference()
     {
         while (true)
@@ -1121,19 +1247,27 @@ public class SSTableReader extends SSTable implements Closeable
      */
     public void releaseReference()
     {
-        if (references.decrementAndGet() == 0 && isCompacted.get())
+        if (references.decrementAndGet() == 0)
         {
-            /**
-             * Make OS a favour and suggest (using fadvice call) that we
-             * don't want to see pages of this SSTable in memory anymore.
-             *
-             * NOTE: We can't use madvice in java because it requires address of
-             * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
-             */
-            dropPageCache();
-
             FileUtils.closeQuietly(this);
-            deletingTask.schedule();
+
+            // if this SSTR instance was replaced by another with a different index summary, let the new instance
+            // handle clearing the page cache and deleting the files
+            if (isCompacted.get())
+            {
+                assert !isReplaced.get();
+
+                /**
+                 * Do the OS a favour and suggest (using fadvice call) that we
+                 * don't want to see pages of this SSTable in memory anymore.
+                 *
+                 * NOTE: We can't use madvice in java because it requires the address of
+                 * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
+                 */
+                dropPageCache();
+
+                deletingTask.schedule();
+            }
         }
         assert references.get() >= 0 : "Reference counter " +  references.get() + " for " + dfile.path;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index b5d50cf..5b02d37 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -414,7 +414,7 @@ public class SSTableWriter extends SSTable
             indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)),
                                               !metadata.populateIoCacheOnFlush());
             builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
-            summary = new IndexSummaryBuilder(keyCount, metadata.getIndexInterval());
+            summary = new IndexSummaryBuilder(keyCount, metadata.getIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
             bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 3410e9b..5dabb42 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -487,10 +487,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         initialized = true;
         isClientMode = false;
 
-        // Ensure StorageProxy is initialized on start-up; see CASSANDRA-3797.
         try
         {
+            // Ensure StorageProxy is initialized on start-up; see CASSANDRA-3797.
             Class.forName("org.apache.cassandra.service.StorageProxy");
+            // also IndexSummaryManager, which is otherwise unreferenced
+            Class.forName("org.apache.cassandra.io.sstable.IndexSummaryManager");
         }
         catch (ClassNotFoundException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index ef20e64..1c13942 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -202,7 +202,8 @@ public class SchemaLoader
                                                                                .compactionStrategyOptions(leveledOptions),
                                            standardCFMD(ks1, "legacyleveled")
                                                                                .compactionStrategyClass(LeveledCompactionStrategy.class)
-                                                                               .compactionStrategyOptions(leveledOptions)));
+                                                                               .compactionStrategyOptions(leveledOptions),
+                                           standardCFMD(ks1, "StandardLowIndexInterval").indexInterval(8).caching(CFMetaData.Caching.NONE)));
 
         // Keyspace 2
         schema.add(KSMetaData.testMetadata(ks2,