You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/11/22 19:13:07 UTC
[1/2] allocate fixed index summary memory pool and resample cold
index summaries to use less memory patch by Tyler Hobbs;
reviewed by jbellis for CASSANDRA-5519
Updated Branches:
refs/heads/trunk 40598efa6 -> dbd1a727b
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
new file mode 100644
index 0000000..aac70ec
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+import static org.apache.cassandra.io.sstable.Downsampling.MIN_SAMPLING_LEVEL;
+import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD;
+import static org.apache.cassandra.io.sstable.IndexSummaryManager.UPSAMPLE_THRESHOLD;
+import static org.apache.cassandra.io.sstable.IndexSummaryManager.redistributeSummaries;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class IndexSummaryManagerTest extends SchemaLoader
+{
+ private static final Logger logger = LoggerFactory.getLogger(IndexSummaryManagerTest.class);
+
+
+ private static long totalOffHeapSize(List<SSTableReader> sstables)
+ {
+ long total = 0;
+ for (SSTableReader sstable : sstables)
+ total += sstable.getIndexSummaryOffHeapSize();
+
+ return total;
+ }
+
+ private static List<SSTableReader> resetSummaries(List<SSTableReader> sstables, long originalOffHeapSize) throws IOException
+ {
+ for (SSTableReader sstable : sstables)
+ sstable.readMeter = new RestorableMeter(100.0, 100.0);
+
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, originalOffHeapSize * sstables.size());
+ for (SSTableReader sstable : sstables)
+ assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
+
+ return sstables;
+ }
+
+ private void validateData(ColumnFamilyStore cfs, int numRows)
+ {
+ for (int i = 0; i < numRows; i++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(i));
+ QueryFilter filter = QueryFilter.getIdentityFilter(key, cfs.getColumnFamilyName(), System.currentTimeMillis());
+ ColumnFamily row = cfs.getColumnFamily(filter);
+ assertNotNull(row);
+ Column column = row.getColumn(ByteBufferUtil.bytes("column"));
+ assertNotNull(column);
+ assertEquals(100, column.value().array().length);
+ }
+ }
+
+ private Comparator<SSTableReader> hotnessComparator = new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ return Double.compare(o1.readMeter.fifteenMinuteRate(), o2.readMeter.fifteenMinuteRate());
+ }
+ };
+
+ @Test
+ public void testRedistributeSummaries() throws IOException
+ {
+ String ksname = "Keyspace1";
+ String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
+ Keyspace keyspace = Keyspace.open(ksname);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+ int numSSTables = 4;
+ int numRows = 256;
+ for (int sstable = 0; sstable < numSSTables; sstable++)
+ {
+ for (int row = 0; row < numRows; row++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(row));
+ RowMutation rm = new RowMutation(ksname, key.key);
+ rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
+ assertEquals(numSSTables, sstables.size());
+ validateData(cfs, numRows);
+
+ for (SSTableReader sstable : sstables)
+ sstable.readMeter = new RestorableMeter(100.0, 100.0);
+
+ long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize();
+
+ // there should be enough space to not downsample anything
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * numSSTables));
+ for (SSTableReader sstable : sstables)
+ assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
+ assertEquals(singleSummaryOffHeapSpace * numSSTables, totalOffHeapSize(sstables));
+ validateData(cfs, numRows);
+
+ // everything should get cut in half
+ assert sstables.size() == 4;
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * (numSSTables / 2)));
+ for (SSTableReader sstable : sstables)
+ assertEquals(BASE_SAMPLING_LEVEL / 2, sstable.getIndexSummarySamplingLevel());
+ validateData(cfs, numRows);
+
+ // everything should get cut to a quarter
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * (numSSTables / 4)));
+ for (SSTableReader sstable : sstables)
+ assertEquals(BASE_SAMPLING_LEVEL / 4, sstable.getIndexSummarySamplingLevel());
+ validateData(cfs, numRows);
+
+ // upsample back up to half
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables,(singleSummaryOffHeapSpace * (numSSTables / 2)));
+ assert sstables.size() == 4;
+ for (SSTableReader sstable : sstables)
+ assertEquals(BASE_SAMPLING_LEVEL / 2, sstable.getIndexSummarySamplingLevel());
+ validateData(cfs, numRows);
+
+ // upsample back up to the original index summary
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * numSSTables));
+ for (SSTableReader sstable : sstables)
+ assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
+ validateData(cfs, numRows);
+
+ // make two of the four sstables cold, only leave enough space for three full index summaries,
+ // so the two cold sstables should get downsampled to be half of their original size
+ sstables.get(0).readMeter = new RestorableMeter(50.0, 50.0);
+ sstables.get(1).readMeter = new RestorableMeter(50.0, 50.0);
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3));
+ Collections.sort(sstables, hotnessComparator);
+ assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel());
+ assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(1).getIndexSummarySamplingLevel());
+ assertEquals(BASE_SAMPLING_LEVEL, sstables.get(2).getIndexSummarySamplingLevel());
+ assertEquals(BASE_SAMPLING_LEVEL, sstables.get(3).getIndexSummarySamplingLevel());
+ validateData(cfs, numRows);
+
+ // small increases or decreases in the read rate don't result in downsampling or upsampling
+ double lowerRate = 50.0 * (DOWNSAMPLE_THESHOLD + (DOWNSAMPLE_THESHOLD * 0.10));
+ double higherRate = 50.0 * (UPSAMPLE_THRESHOLD - (UPSAMPLE_THRESHOLD * 0.10));
+ sstables.get(0).readMeter = new RestorableMeter(lowerRate, lowerRate);
+ sstables.get(1).readMeter = new RestorableMeter(higherRate, higherRate);
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3));
+ Collections.sort(sstables, hotnessComparator);
+ assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel());
+ assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(1).getIndexSummarySamplingLevel());
+ assertEquals(BASE_SAMPLING_LEVEL, sstables.get(2).getIndexSummarySamplingLevel());
+ assertEquals(BASE_SAMPLING_LEVEL, sstables.get(3).getIndexSummarySamplingLevel());
+ validateData(cfs, numRows);
+
+ // reset, and then this time, leave enough space for one of the cold sstables to not get downsampled
+ sstables = resetSummaries(sstables, singleSummaryOffHeapSpace);
+ sstables.get(0).readMeter = new RestorableMeter(1.0, 1.0);
+ sstables.get(1).readMeter = new RestorableMeter(2.0, 2.0);
+ sstables.get(2).readMeter = new RestorableMeter(1000.0, 1000.0);
+ sstables.get(3).readMeter = new RestorableMeter(1000.0, 1000.0);
+
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (singleSummaryOffHeapSpace * 3) + 50);
+ Collections.sort(sstables, hotnessComparator);
+
+ if (sstables.get(0).getIndexSummarySamplingLevel() == MIN_SAMPLING_LEVEL)
+ assertEquals(BASE_SAMPLING_LEVEL, sstables.get(1).getIndexSummarySamplingLevel());
+ else
+ assertEquals(BASE_SAMPLING_LEVEL, sstables.get(0).getIndexSummarySamplingLevel());
+
+ assertEquals(BASE_SAMPLING_LEVEL, sstables.get(2).getIndexSummarySamplingLevel());
+ assertEquals(BASE_SAMPLING_LEVEL, sstables.get(3).getIndexSummarySamplingLevel());
+ validateData(cfs, numRows);
+
+
+ // Cause a mix of upsampling and downsampling. We'll leave enough space for two full index summaries. The two
+ // coldest sstables will get downsampled to 8/128 of their size, leaving us with 1 and 112/128th index
+ // summaries worth of space. The hottest sstable should get a full index summary, and the one in the middle
+ // should get the remainder.
+ sstables.get(0).readMeter = new RestorableMeter(0.0, 0.0);
+ sstables.get(1).readMeter = new RestorableMeter(0.0, 0.0);
+ sstables.get(2).readMeter = new RestorableMeter(100, 100);
+ sstables.get(3).readMeter = new RestorableMeter(128.0, 128.0);
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (100.0 / BASE_SAMPLING_LEVEL))));
+ Collections.sort(sstables, hotnessComparator);
+ assertEquals(MIN_SAMPLING_LEVEL, sstables.get(0).getIndexSummarySamplingLevel());
+ assertEquals(MIN_SAMPLING_LEVEL, sstables.get(1).getIndexSummarySamplingLevel());
+ assertTrue(sstables.get(2).getIndexSummarySamplingLevel() > MIN_SAMPLING_LEVEL);
+ assertTrue(sstables.get(2).getIndexSummarySamplingLevel() < BASE_SAMPLING_LEVEL);
+ assertEquals(BASE_SAMPLING_LEVEL, sstables.get(3).getIndexSummarySamplingLevel());
+ validateData(cfs, numRows);
+
+ // Don't leave enough space for even the minimal index summaries
+ sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 100);
+ for (SSTableReader sstable : sstables)
+ assertEquals(MIN_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
+ validateData(cfs, numRows);
+ }
+
+ @Test
+ public void testRebuildAtSamplingLevel() throws IOException
+ {
+ String ksname = "Keyspace1";
+ String cfname = "StandardLowIndexInterval";
+ Keyspace keyspace = Keyspace.open(ksname);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+ int numRows = 256;
+ for (int row = 0; row < numRows; row++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(row));
+ RowMutation rm = new RowMutation(ksname, key.key);
+ rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+
+ List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
+ assertEquals(1, sstables.size());
+ SSTableReader sstable = sstables.get(0);
+
+ for (int samplingLevel = MIN_SAMPLING_LEVEL; samplingLevel < BASE_SAMPLING_LEVEL; samplingLevel++)
+ {
+ sstable = sstable.cloneWithNewSummarySamplingLevel(samplingLevel);
+ assertEquals(samplingLevel, sstable.getIndexSummarySamplingLevel());
+ int expectedSize = (numRows * samplingLevel) / (sstable.metadata.getIndexInterval() * BASE_SAMPLING_LEVEL);
+ assertEquals(expectedSize, sstable.getIndexSummarySize(), 1);
+ }
+ }
+
+ @Test
+ public void testJMXFunctions() throws IOException
+ {
+ IndexSummaryManager manager = IndexSummaryManager.instance;
+
+ // resize interval
+ assertNotNull(manager.getResizeIntervalInMinutes());
+ manager.setResizeIntervalInMinutes(-1);
+ assertNull(manager.getTimeToNextResize(TimeUnit.MINUTES));
+
+ manager.setResizeIntervalInMinutes(10);
+ assertEquals(10, manager.getResizeIntervalInMinutes());
+ assertEquals(10, manager.getTimeToNextResize(TimeUnit.MINUTES), 1);
+ manager.setResizeIntervalInMinutes(15);
+ assertEquals(15, manager.getResizeIntervalInMinutes());
+ assertEquals(15, manager.getTimeToNextResize(TimeUnit.MINUTES), 2);
+
+ // memory pool capacity
+ assertTrue(manager.getMemoryPoolCapacityInMB() >= 0);
+ manager.setMemoryPoolCapacityInMB(10);
+ assertEquals(10, manager.getMemoryPoolCapacityInMB());
+
+ String ksname = "Keyspace1";
+ String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
+ Keyspace keyspace = Keyspace.open(ksname);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+ int numSSTables = 2;
+ int numRows = 10;
+ for (int sstable = 0; sstable < numSSTables; sstable++)
+ {
+ for (int row = 0; row < numRows; row++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(row));
+ RowMutation rm = new RowMutation(ksname, key.key);
+ rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ assertEquals(1.0, manager.getAverageSamplingRatio(), 0.001);
+ Map<String, Double> samplingRatios = manager.getSamplingRatios();
+ for (Map.Entry<String, Double> entry : samplingRatios.entrySet())
+ assertEquals(1.0, entry.getValue(), 0.001);
+
+ manager.setMemoryPoolCapacityInMB(0);
+ manager.redistributeSummaries();
+ assertTrue(manager.getAverageSamplingRatio() < 0.99);
+ samplingRatios = manager.getSamplingRatios();
+ for (Map.Entry<String, Double> entry : samplingRatios.entrySet())
+ {
+ if (entry.getKey().contains("StandardLowIndexInterval"))
+ assertTrue(entry.getValue() < 0.9);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index 8e73161..8d013f9 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -23,22 +23,25 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import com.google.common.collect.Lists;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
-import static org.junit.Assert.assertArrayEquals;
+import static org.apache.cassandra.io.sstable.IndexSummaryBuilder.downsample;
+import static org.apache.cassandra.io.sstable.IndexSummaryBuilder.entriesAtSamplingLevel;
+import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+import static org.apache.cassandra.io.sstable.Downsampling.MIN_SAMPLING_LEVEL;
+
+import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
public class IndexSummaryTest
@@ -73,13 +76,13 @@ public class IndexSummaryTest
Pair<List<DecoratedKey>, IndexSummary> random = generateRandomIndex(100, 1);
ByteArrayOutputStream aos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(aos);
- IndexSummary.serializer.serialize(random.right, dos);
+ IndexSummary.serializer.serialize(random.right, dos, false);
// write junk
dos.writeUTF("JUNK");
dos.writeUTF("JUNK");
FileUtils.closeQuietly(dos);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(aos.toByteArray()));
- IndexSummary is = IndexSummary.serializer.deserialize(dis, DatabaseDescriptor.getPartitioner());
+ IndexSummary is = IndexSummary.serializer.deserialize(dis, DatabaseDescriptor.getPartitioner(), false);
for (int i = 0; i < 100; i++)
assertEquals(i, is.binarySearch(random.left.get(i)));
// read the junk
@@ -92,7 +95,7 @@ public class IndexSummaryTest
public void testAddEmptyKey() throws Exception
{
IPartitioner p = new RandomPartitioner();
- IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1);
+ IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1, BASE_SAMPLING_LEVEL);
builder.maybeAddEntry(p.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER), 0);
IndexSummary summary = builder.build(p);
assertEquals(1, summary.size());
@@ -101,9 +104,9 @@ public class IndexSummaryTest
ByteArrayOutputStream aos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(aos);
- IndexSummary.serializer.serialize(summary, dos);
+ IndexSummary.serializer.serialize(summary, dos, false);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(aos.toByteArray()));
- IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p);
+ IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false);
assertEquals(1, loaded.size());
assertEquals(summary.getPosition(0), loaded.getPosition(0));
@@ -113,7 +116,7 @@ public class IndexSummaryTest
private Pair<List<DecoratedKey>, IndexSummary> generateRandomIndex(int size, int interval)
{
List<DecoratedKey> list = Lists.newArrayList();
- IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), interval);
+ IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), interval, BASE_SAMPLING_LEVEL);
for (int i = 0; i < size; i++)
{
UUID uuid = UUID.randomUUID();
@@ -126,4 +129,128 @@ public class IndexSummaryTest
IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner());
return Pair.create(list, summary);
}
-}
+
+ @Test
+ public void testDownsamplePatterns()
+ {
+ assertEquals(Arrays.asList(0), Downsampling.getSamplingPattern(0));
+ assertEquals(Arrays.asList(0), Downsampling.getSamplingPattern(1));
+
+ assertEquals(Arrays.asList(0, 1), Downsampling.getSamplingPattern(2));
+ assertEquals(Arrays.asList(0, 2, 1, 3), Downsampling.getSamplingPattern(4));
+ assertEquals(Arrays.asList(0, 4, 2, 6, 1, 5, 3, 7), Downsampling.getSamplingPattern(8));
+ assertEquals(Arrays.asList(0, 8, 4, 12, 2, 10, 6, 14, 1, 9, 5, 13, 3, 11, 7, 15), Downsampling.getSamplingPattern(16));
+ }
+
+ private static boolean shouldSkip(int index, List<Integer> startPoints)
+ {
+ for (int start : startPoints)
+ {
+ if ((index - start) % BASE_SAMPLING_LEVEL == 0)
+ return true;
+ }
+ return false;
+ }
+
+ @Test
+ public void testDownsample()
+ {
+ final int NUM_KEYS = 4096;
+ final int INDEX_INTERVAL = 128;
+ final int ORIGINAL_NUM_ENTRIES = NUM_KEYS / INDEX_INTERVAL;
+
+
+ Pair<List<DecoratedKey>, IndexSummary> random = generateRandomIndex(NUM_KEYS, INDEX_INTERVAL);
+ List<DecoratedKey> keys = random.left;
+ IndexSummary original = random.right;
+
+ // sanity check on the original index summary
+ for (int i = 0; i < ORIGINAL_NUM_ENTRIES; i++)
+ assertEquals(keys.get(i * INDEX_INTERVAL).key, ByteBuffer.wrap(original.getKey(i)));
+
+ List<Integer> samplePattern = Downsampling.getSamplingPattern(BASE_SAMPLING_LEVEL);
+
+ // downsample by one level, then two levels, then three levels...
+ int downsamplingRound = 1;
+ for (int samplingLevel = BASE_SAMPLING_LEVEL - 1; samplingLevel >= MIN_SAMPLING_LEVEL; samplingLevel--)
+ {
+ IndexSummary downsampled = downsample(original, samplingLevel, DatabaseDescriptor.getPartitioner());
+ assertEquals(entriesAtSamplingLevel(samplingLevel, original.getMaxNumberOfEntries()), downsampled.size());
+
+ int sampledCount = 0;
+ List<Integer> skipStartPoints = samplePattern.subList(0, downsamplingRound);
+ for (int i = 0; i < ORIGINAL_NUM_ENTRIES; i++)
+ {
+ if (!shouldSkip(i, skipStartPoints))
+ {
+ assertEquals(keys.get(i * INDEX_INTERVAL).key, ByteBuffer.wrap(downsampled.getKey(sampledCount)));
+ sampledCount++;
+ }
+ }
+ downsamplingRound++;
+ }
+
+ // downsample one level each time
+ IndexSummary previous = original;
+ downsamplingRound = 1;
+ for (int downsampleLevel = BASE_SAMPLING_LEVEL - 1; downsampleLevel >= MIN_SAMPLING_LEVEL; downsampleLevel--)
+ {
+ IndexSummary downsampled = downsample(previous, downsampleLevel, DatabaseDescriptor.getPartitioner());
+ assertEquals(entriesAtSamplingLevel(downsampleLevel, original.getMaxNumberOfEntries()), downsampled.size());
+
+ int sampledCount = 0;
+ List<Integer> skipStartPoints = samplePattern.subList(0, downsamplingRound);
+ for (int i = 0; i < ORIGINAL_NUM_ENTRIES; i++)
+ {
+ if (!shouldSkip(i, skipStartPoints))
+ {
+ assertEquals(keys.get(i * INDEX_INTERVAL).key, ByteBuffer.wrap(downsampled.getKey(sampledCount)));
+ sampledCount++;
+ }
+ }
+
+ previous = downsampled;
+ downsamplingRound++;
+ }
+ }
+
+ @Test
+ public void testOriginalIndexLookup()
+ {
+ for (int i = BASE_SAMPLING_LEVEL; i >= MIN_SAMPLING_LEVEL; i--)
+ assertEquals(i, Downsampling.getOriginalIndexes(i).size());
+
+ ArrayList<Integer> full = new ArrayList<>();
+ for (int i = 0; i < BASE_SAMPLING_LEVEL; i++)
+ full.add(i);
+
+ assertEquals(full, Downsampling.getOriginalIndexes(BASE_SAMPLING_LEVEL));
+ // the entry at index 0 is the first to go
+ assertEquals(full.subList(1, full.size()), Downsampling.getOriginalIndexes(BASE_SAMPLING_LEVEL - 1));
+
+ // spot check a few values (these depend on BASE_SAMPLING_LEVEL being 128)
+ assert BASE_SAMPLING_LEVEL == 128;
+ assertEquals(Arrays.asList(31, 63, 95, 127), Downsampling.getOriginalIndexes(4));
+ assertEquals(Arrays.asList(63, 127), Downsampling.getOriginalIndexes(2));
+ assertEquals(Arrays.asList(), Downsampling.getOriginalIndexes(0));
+ }
+
+ @Test
+ public void testGetNumberOfSkippedEntriesAfterIndex()
+ {
+ int indexInterval = 128;
+ for (int i = 0; i < BASE_SAMPLING_LEVEL; i++)
+ assertEquals(indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(i, BASE_SAMPLING_LEVEL, indexInterval));
+
+ // with one round of downsampling, only the first summary has been removed, so only the last index will have
+ // double the gap until the next sample
+ for (int i = 0; i < BASE_SAMPLING_LEVEL - 2; i++)
+ assertEquals(indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(i, BASE_SAMPLING_LEVEL - 1, indexInterval));
+ assertEquals(indexInterval * 2, Downsampling.getEffectiveIndexIntervalAfterIndex(BASE_SAMPLING_LEVEL - 2, BASE_SAMPLING_LEVEL - 1, indexInterval));
+
+ // at samplingLevel=2, the retained summary points are [63, 127] (assumes BASE_SAMPLING_LEVEL is 128)
+ assert BASE_SAMPLING_LEVEL == 128;
+ assertEquals(64 * indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(0, 2, indexInterval));
+ assertEquals(64 * indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(1, 2, indexInterval));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index b771e72..6bf17fd 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -23,17 +23,20 @@ package org.apache.cassandra.io.sstable;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
import org.junit.Assert;
import com.google.common.collect.Sets;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
@@ -54,11 +57,17 @@ import org.apache.cassandra.io.util.SegmentedFile;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
@RunWith(OrderedJUnit4ClassRunner.class)
public class SSTableReaderTest extends SchemaLoader
{
+ private static final Logger logger = LoggerFactory.getLogger(SSTableReaderTest.class);
+
static Token t(int i)
{
return StorageService.getPartitioner().getToken(ByteBufferUtil.bytes(String.valueOf(i)));
@@ -252,8 +261,8 @@ public class SSTableReaderTest extends SchemaLoader
// test to see if sstable can be opened as expected
SSTableReader target = SSTableReader.open(desc);
- Assert.assertEquals(target.getKeySampleSize(), 1);
- Assert.assertArrayEquals(ByteBufferUtil.getArray(firstKey.key), target.getKeySample(0));
+ Assert.assertEquals(target.getIndexSummarySize(), 1);
+ Assert.assertArrayEquals(ByteBufferUtil.getArray(firstKey.key), target.getIndexSummaryKey(0));
assert target.first.equals(firstKey);
assert target.last.equals(lastKey);
}
@@ -341,6 +350,64 @@ public class SSTableReaderTest extends SchemaLoader
assert sections.size() == 1 : "Expected to find range in sstable opened for bulk loading";
}
+ @Test
+ public void testIndexSummaryReplacement() throws IOException, ExecutionException, InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open("Keyspace1");
+ final ColumnFamilyStore store = keyspace.getColumnFamilyStore("StandardLowIndexInterval"); // index interval of 8, no key caching
+ CompactionManager.instance.disableAutoCompaction();
+
+ final int NUM_ROWS = 1000;
+ for (int j = 0; j < NUM_ROWS; j++)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", j));
+ RowMutation rm = new RowMutation("Keyspace1", key);
+ rm.add("StandardLowIndexInterval", ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes(String.format("%3d", j)), j);
+ rm.apply();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store);
+
+ Collection<SSTableReader> sstables = store.getSSTables();
+ assert sstables.size() == 1;
+ final SSTableReader sstable = sstables.iterator().next();
+
+ ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5);
+ List<Future> futures = new ArrayList<>(NUM_ROWS * 2);
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ final ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", i));
+ final int index = i;
+
+ futures.add(executor.submit(new Runnable()
+ {
+ public void run()
+ {
+ ColumnFamily result = store.getColumnFamily(sstable.partitioner.decorateKey(key), ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 100, 100);
+ assertFalse(result.isEmpty());
+ assertEquals(0, ByteBufferUtil.compare(String.format("%3d", index).getBytes(), result.getColumn(ByteBufferUtil.bytes("0")).value()));
+ }
+ }));
+
+ futures.add(executor.submit(new Runnable()
+ {
+ public void run()
+ {
+ Iterable<DecoratedKey> results = store.keySamples(
+ new Range<>(sstable.partitioner.getMinimumToken(), sstable.partitioner.getToken(key)));
+ assertTrue(results.iterator().hasNext());
+ }
+ }));
+ }
+
+ SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(Downsampling.MIN_SAMPLING_LEVEL);
+ store.getDataTracker().replaceReaders(Arrays.asList(sstable), Arrays.asList(replacement));
+ for (Future future : futures)
+ future.get();
+
+ assertEquals(sstable.estimatedKeys(), replacement.estimatedKeys(), 1);
+ }
+
private void assertIndexQueryWorks(ColumnFamilyStore indexedCFS) throws IOException
{
assert "Indexed1".equals(indexedCFS.name);
[2/2] git commit: allocate fixed index summary memory pool and
resample cold index summaries to use less memory patch by Tyler Hobbs;
reviewed by jbellis for CASSANDRA-5519
Posted by jb...@apache.org.
allocate fixed index summary memory pool and resample cold index summaries to use less memory
patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-5519
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dbd1a727
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dbd1a727
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dbd1a727
Branch: refs/heads/trunk
Commit: dbd1a727b7481a3dcd9867e3a6f7791c1095e12a
Parents: 40598ef
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Nov 22 12:09:20 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Nov 22 12:12:55 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
conf/cassandra.yaml | 14 +
.../org/apache/cassandra/config/Config.java | 3 +
.../cassandra/config/DatabaseDescriptor.java | 21 +
.../org/apache/cassandra/db/DataTracker.java | 37 +-
.../compaction/AbstractCompactionStrategy.java | 2 +-
.../apache/cassandra/io/sstable/Descriptor.java | 7 +-
.../cassandra/io/sstable/Downsampling.java | 150 +++++++
.../cassandra/io/sstable/IndexSummary.java | 137 +++++-
.../io/sstable/IndexSummaryBuilder.java | 150 ++++++-
.../io/sstable/IndexSummaryManager.java | 434 +++++++++++++++++++
.../io/sstable/IndexSummaryManagerMBean.java | 48 ++
.../cassandra/io/sstable/SSTableReader.java | 236 +++++++---
.../cassandra/io/sstable/SSTableWriter.java | 2 +-
.../cassandra/service/StorageService.java | 4 +-
.../unit/org/apache/cassandra/SchemaLoader.java | 3 +-
.../io/sstable/IndexSummaryManagerTest.java | 327 ++++++++++++++
.../cassandra/io/sstable/IndexSummaryTest.java | 151 ++++++-
.../cassandra/io/sstable/SSTableReaderTest.java | 79 +++-
19 files changed, 1694 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 79e5880..f54afe1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.1
+ * allocate fixed index summary memory pool and resample cold index summaries
+ to use less memory (CASSANDRA-5519)
* Removed multithreaded compaction (CASSANDRA-6142)
* Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
* change logging from log4j to logback (CASSANDRA-5883)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 53d091d..180fcd5 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -269,6 +269,20 @@ concurrent_writes: 32
# the maximum number of secondary indexes created on a single CF.
memtable_flush_queue_size: 4
+# A fixed memory pool size in MB for for SSTable index summaries. If left
+# empty, this will default to 5% of the heap size. If the memory usage of
+# all index summaries exceeds this limit, SSTables with low read rates will
+# shrink their index summaries in order to meet this limit. However, this
+# is a best-effort process. In extreme conditions Cassandra may need to use
+# more than this amount of memory.
+index_summary_capacity_in_mb:
+
+# How frequently index summaries should be resampled. This is done
+# periodically to redistribute memory from the fixed-size pool to sstables
+# proportional their recent read rates. Setting to -1 will disable this
+# process, leaving existing index summaries at their current sampling level.
+index_summary_resize_interval_in_minutes: 60
+
# Whether to, when doing sequential writing, fsync() at intervals in
# order to force the operating system to flush the dirty
# buffers. Enable this to avoid sudden dirty buffer flushing from
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index f41a112..c48d652 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -183,6 +183,9 @@ public class Config
public volatile int tombstone_warn_threshold = 1000;
public volatile int tombstone_failure_threshold = 100000;
+ public volatile Long index_summary_capacity_in_mb;
+ public volatile int index_summary_resize_interval_in_minutes = 60;
+
public static boolean getOutboundBindAny()
{
return outboundBindAny;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index d412032..3a4ab62 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.IndexSummaryManager;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.IAllocator;
import org.apache.cassandra.locator.DynamicEndpointSnitch;
@@ -86,6 +87,7 @@ public class DatabaseDescriptor
private static long keyCacheSizeInMB;
private static IAllocator memoryAllocator;
+ private static long indexSummaryCapacityInMB;
private static String localDC;
private static Comparator<InetAddress> localComparator;
@@ -449,6 +451,15 @@ public class DatabaseDescriptor
+ conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.");
}
+ // if set to empty/"auto" then use 5% of Heap size
+ indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
+ ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
+ : conf.index_summary_capacity_in_mb;
+
+ if (indexSummaryCapacityInMB < 0)
+ throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
+ + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.");
+
memoryAllocator = FBUtilities.newOffHeapAllocator(conf.memory_allocator);
if(conf.encryption_options != null)
@@ -1221,6 +1232,11 @@ public class DatabaseDescriptor
return keyCacheSizeInMB;
}
+ public static long getIndexSummaryCapacityInMB()
+ {
+ return indexSummaryCapacityInMB;
+ }
+
public static int getKeyCacheSavePeriod()
{
return conf.key_cache_save_period;
@@ -1312,4 +1328,9 @@ public class DatabaseDescriptor
throw new RuntimeException(e);
}
}
+
+ public static int getIndexSummaryResizeIntervalInMinutes()
+ {
+ return conf.index_summary_resize_interval_in_minutes;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 1c25f44..64b088d 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -328,6 +328,41 @@ public class DataTracker
SSTableIntervalTree.empty()));
}
+ /**
+ * A special kind of replacement for SSTableReaders that were cloned with a new index summary sampling level (see
+ * SSTableReader.cloneWithNewSummarySamplingLevel and CASSANDRA-5519). This does not mark the old reader
+ * as compacted.
+ * @param oldSSTables replaced readers
+ * @param newSSTables replacement readers
+ */
+ public void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables)
+ {
+ // data component will be unchanged but the index summary will be a different size
+ // (since we save that to make restart fast)
+ long sizeIncrease = 0;
+ for (SSTableReader sstable : oldSSTables)
+ sizeIncrease -= sstable.bytesOnDisk();
+ for (SSTableReader sstable : newSSTables)
+ sizeIncrease += sstable.bytesOnDisk();
+
+ View currentView, newView;
+ do
+ {
+ currentView = view.get();
+ newView = currentView.replace(oldSSTables, newSSTables);
+ }
+ while (!view.compareAndSet(currentView, newView));
+
+ StorageMetrics.load.inc(sizeIncrease);
+ cfstore.metric.liveDiskSpaceUsed.inc(sizeIncrease);
+
+ for (SSTableReader sstable : newSSTables)
+ sstable.setTrackedBy(this);
+
+ for (SSTableReader sstable : oldSSTables)
+ sstable.releaseReference();
+ }
+
private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
{
if (!cfstore.isValid())
@@ -595,4 +630,4 @@ public class DataTracker
return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", memtablesPendingFlush.size(), sstables, compacting);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index b63caab..dac3d47 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -283,7 +283,7 @@ public abstract class AbstractCompactionStrategy
else
{
// what percentage of columns do we expect to compact outside of overlap?
- if (sstable.getKeySampleSize() < 2)
+ if (sstable.getIndexSummarySize() < 2)
{
// we have too few samples to estimate correct percentage
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 1b29c1c..fef6a1e 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -43,8 +43,8 @@ public class Descriptor
// we always incremented the major version.
public static class Version
{
- // This needs to be at the begining for initialization sake
- public static final String current_version = "jb";
+ // This needs to be at the beginning for initialization sake
+ public static final String current_version = "jc";
// ic (1.2.5): omits per-row bloom filter of column names
// ja (2.0.0): super columns are serialized as composites (note that there is no real format change,
@@ -57,6 +57,7 @@ public class Descriptor
// tracks max/min column values (according to comparator)
// jb (2.0.1): switch from crc32 to adler32 for compression checksums
// checksum the compressed data
+ // jc (2.1.0): index summaries can be downsampled and the sampling level is persisted
public static final Version CURRENT = new Version(current_version);
@@ -70,6 +71,7 @@ public class Descriptor
public final boolean hasRowSizeAndColumnCount;
public final boolean tracksMaxMinColumnNames;
public final boolean hasPostCompressionAdlerChecksums;
+ public final boolean hasSamplingLevel;
public Version(String version)
{
@@ -82,6 +84,7 @@ public class Descriptor
hasRowSizeAndColumnCount = version.compareTo("ja") < 0;
tracksMaxMinColumnNames = version.compareTo("ja") >= 0;
hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0;
+ hasSamplingLevel = version.compareTo("jc") >= 0;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/Downsampling.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Downsampling.java b/src/java/org/apache/cassandra/io/sstable/Downsampling.java
new file mode 100644
index 0000000..62ca1be
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/Downsampling.java
@@ -0,0 +1,150 @@
+package org.apache.cassandra.io.sstable;
+
+import java.util.*;
+
+public class Downsampling
+{
+ /**
+ * The base (down)sampling level determines the granularity at which we can down/upsample.
+ *
+ * A higher number allows us to approximate more closely the ideal sampling. (It could also mean we do a lot of
+ * expensive almost-no-op resamplings from N to N-1, but the thresholds in IndexSummaryManager prevent that.)
+ *
+ * BSL must be a power of two in order to have good sampling patterns. This cannot be changed without rebuilding
+ * all index summaries at full sampling; for now we treat it as a constant.
+ */
+ public static final int BASE_SAMPLING_LEVEL = 128;
+
+ /**
+ * The lowest level we will downsample to: the coarsest summary will have (MSL / BSL) entries left.
+ *
+ * This can be anywhere from 1 to the base sampling level.
+ */
+ public static final int MIN_SAMPLING_LEVEL = 8;
+
+ private static final Map<Integer, List<Integer>> samplePatternCache = new HashMap<>();
+
+ private static final Map<Integer, List<Integer>> originalIndexCache = new HashMap<>();
+
+ /**
+ * Gets a list L of starting indices for downsampling rounds: the first round should start with the offset
+ * given by L[0], the second by the offset in L[1], etc.
+ *
+ * @param samplingLevel the base sampling level
+ *
+ * @return A list of `samplingLevel` unique indices between 0 and `samplingLevel`
+ */
+ public static List<Integer> getSamplingPattern(int samplingLevel)
+ {
+ List<Integer> pattern = samplePatternCache.get(samplingLevel);
+ if (pattern != null)
+ return pattern;
+
+ if (samplingLevel <= 1)
+ return Arrays.asList(0);
+
+ ArrayList<Integer> startIndices = new ArrayList<>(samplingLevel);
+ startIndices.add(0);
+
+ int spread = samplingLevel;
+ while (spread >= 2)
+ {
+ ArrayList<Integer> roundIndices = new ArrayList<>(samplingLevel / spread);
+ for (int i = spread / 2; i < samplingLevel; i += spread)
+ roundIndices.add(i);
+
+ // especially for latter rounds, it's important that we spread out the start points, so we'll
+ // make a recursive call to get an ordering for this list of start points
+ List<Integer> roundIndicesOrdering = getSamplingPattern(roundIndices.size());
+ for (int i = 0; i < roundIndices.size(); ++i)
+ startIndices.add(roundIndices.get(roundIndicesOrdering.get(i)));
+
+ spread /= 2;
+ }
+
+ samplePatternCache.put(samplingLevel, startIndices);
+ return startIndices;
+ }
+
+ /**
+ * Returns a list that can be used to translate current index summary indexes to their original index before
+ * downsampling. (This repeats every `samplingLevel`, so that's how many entries we return.)
+ *
+ * For example, if [7, 15] is returned, the current index summary entry at index 0 was originally
+ * at index 7, and the current index 1 was originally at index 15.
+ *
+ * @param samplingLevel the current sampling level for the index summary
+ *
+ * @return a list of original indexes for current summary entries
+ */
+ public static List<Integer> getOriginalIndexes(int samplingLevel)
+ {
+ List<Integer> originalIndexes = originalIndexCache.get(samplingLevel);
+ if (originalIndexes != null)
+ return originalIndexes;
+
+ List<Integer> pattern = getSamplingPattern(BASE_SAMPLING_LEVEL).subList(0, BASE_SAMPLING_LEVEL - samplingLevel);
+ originalIndexes = new ArrayList<>(samplingLevel);
+ for (int j = 0; j < BASE_SAMPLING_LEVEL; j++)
+ {
+ if (!pattern.contains(j))
+ originalIndexes.add(j);
+ }
+
+ originalIndexCache.put(samplingLevel, originalIndexes);
+ return originalIndexes;
+ }
+
+ /**
+ * Calculates the effective index interval after the entry at `index` in an IndexSummary. In other words, this
+ * returns the number of partitions in the primary on-disk index before the next partition that has an entry in
+ * the index summary. If samplingLevel == BASE_SAMPLING_LEVEL, this will be equal to the index interval.
+ * @param index an index into an IndexSummary
+ * @param samplingLevel the current sampling level for that IndexSummary
+ * @param indexInterval the index interval
+ * @return the number of partitions before the next index summary entry, inclusive on one end
+ */
+ public static int getEffectiveIndexIntervalAfterIndex(int index, int samplingLevel, int indexInterval)
+ {
+ assert index >= -1;
+ List<Integer> originalIndexes = getOriginalIndexes(samplingLevel);
+ if (index == -1)
+ return originalIndexes.get(0) * indexInterval;
+
+ index %= samplingLevel;
+ if (index == originalIndexes.size() - 1)
+ {
+ // account for partitions after the "last" entry as well as partitions before the "first" entry
+ return ((BASE_SAMPLING_LEVEL - originalIndexes.get(index)) + originalIndexes.get(0)) * indexInterval;
+ }
+ else
+ {
+ return (originalIndexes.get(index + 1) - originalIndexes.get(index)) * indexInterval;
+ }
+ }
+
+ public static int[] getStartPoints(int currentSamplingLevel, int newSamplingLevel)
+ {
+ List<Integer> allStartPoints = getSamplingPattern(BASE_SAMPLING_LEVEL);
+
+ // calculate starting indexes for sampling rounds
+ int initialRound = BASE_SAMPLING_LEVEL - currentSamplingLevel;
+ int numRounds = Math.abs(currentSamplingLevel - newSamplingLevel);
+ int[] startPoints = new int[numRounds];
+ for (int i = 0; i < numRounds; ++i)
+ {
+ int start = allStartPoints.get(initialRound + i);
+
+ // our "ideal" start points will be affected by the removal of items in earlier rounds, so go through all
+ // earlier rounds, and if we see an index that comes before our ideal start point, decrement the start point
+ int adjustment = 0;
+ for (int j = 0; j < initialRound; ++j)
+ {
+ if (allStartPoints.get(j) < start)
+ adjustment++;
+ }
+ startPoints[i] = start - adjustment;
+ }
+ return startPoints;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index be7977e..4fc4737 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -23,6 +23,9 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.IPartitioner;
@@ -31,27 +34,52 @@ import org.apache.cassandra.io.util.MemoryInputStream;
import org.apache.cassandra.io.util.MemoryOutputStream;
import org.apache.cassandra.utils.FBUtilities;
+import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+
+/*
+ * Layout of Memory for index summaries:
+ *
+ * There are two sections:
+ * 1. A "header" containing the offset into `bytes` of entries in the summary summary data, consisting of
+ * one four byte position for each entry in the summary. This allows us do simple math in getIndex()
+ * to find the position in the Memory to start reading the actual index summary entry.
+ * (This is necessary because keys can have different lengths.)
+ * 2. A sequence of (DecoratedKey, position) pairs, where position is the offset into the actual index file.
+ */
public class IndexSummary implements Closeable
{
+ private static final Logger logger = LoggerFactory.getLogger(IndexSummary.class);
+
public static final IndexSummarySerializer serializer = new IndexSummarySerializer();
private final int indexInterval;
private final IPartitioner partitioner;
- private final int summary_size;
+ private final int summarySize;
+ private final int sizeAtFullSampling;
private final Memory bytes;
- public IndexSummary(IPartitioner partitioner, Memory memory, int summary_size, int indexInterval)
+ /**
+ * A value between MIN_SAMPLING_LEVEL and BASE_SAMPLING_LEVEL that represents how many of the original
+ * index summary entries ((1 / indexInterval) * numKeys) have been retained.
+ *
+ * Thus, this summary contains (samplingLevel / BASE_SAMPLING_LEVEL) * ((1 / indexInterval) * numKeys)) entries.
+ */
+ private final int samplingLevel;
+
+ public IndexSummary(IPartitioner partitioner, Memory memory, int summarySize, int sizeAtFullSampling, int indexInterval, int samplingLevel)
{
this.partitioner = partitioner;
this.indexInterval = indexInterval;
- this.summary_size = summary_size;
+ this.summarySize = summarySize;
+ this.sizeAtFullSampling = sizeAtFullSampling;
this.bytes = memory;
+ this.samplingLevel = samplingLevel;
}
// binary search is notoriously more difficult to get right than it looks; this is lifted from
// Harmony's Collections implementation
public int binarySearch(RowPosition key)
{
- int low = 0, mid = summary_size, high = mid - 1, result = -1;
+ int low = 0, mid = summarySize, high = mid - 1, result = -1;
while (low <= high)
{
mid = (low + high) >> 1;
@@ -73,16 +101,21 @@ public class IndexSummary implements Closeable
return -mid - (result < 0 ? 1 : 2);
}
- public int getIndex(int index)
+ /**
+ * Gets the position of the actual index summary entry in our Memory attribute, 'bytes'.
+ * @param index The index of the entry or key to get the position for
+ * @return an offset into our Memory attribute where the actual entry resides
+ */
+ public int getPositionInSummary(int index)
{
- // multiply by 4.
+ // The first section of bytes holds a four-byte position for each entry in the summary, so just multiply by 4.
return bytes.getInt(index << 2);
}
public byte[] getKey(int index)
{
- long start = getIndex(index);
- int keySize = (int) (caclculateEnd(index) - start - 8L);
+ long start = getPositionInSummary(index);
+ int keySize = (int) (calculateEnd(index) - start - 8L);
byte[] key = new byte[keySize];
bytes.getBytes(start, key, 0, keySize);
return key;
@@ -90,12 +123,21 @@ public class IndexSummary implements Closeable
public long getPosition(int index)
{
- return bytes.getLong(caclculateEnd(index) - 8);
+ return bytes.getLong(calculateEnd(index) - 8);
}
- private long caclculateEnd(int index)
+ public byte[] getEntry(int index)
{
- return index == (summary_size - 1) ? bytes.size() : getIndex(index + 1);
+ long start = getPositionInSummary(index);
+ long end = calculateEnd(index);
+ byte[] entry = new byte[(int)(end - start)];
+ bytes.getBytes(start, entry, 0, (int)(end - start));
+ return entry;
+ }
+
+ private long calculateEnd(int index)
+ {
+ return index == (summarySize - 1) ? bytes.size() : getPositionInSummary(index + 1);
}
public int getIndexInterval()
@@ -105,27 +147,80 @@ public class IndexSummary implements Closeable
public int size()
{
- return summary_size;
+ return summarySize;
+ }
+
+ public int getSamplingLevel()
+ {
+ return samplingLevel;
+ }
+
+ /**
+ * Returns the number of entries this summary would have if it were at the full sampling level, which is equal
+ * to the number of entries in the primary on-disk index divided by the index interval.
+ */
+ public int getMaxNumberOfEntries()
+ {
+ return sizeAtFullSampling;
+ }
+
+ /**
+ * Returns the amount of off-heap memory used for this summary.
+ * @return size in bytes
+ */
+ public long getOffHeapSize()
+ {
+ return bytes.size();
+ }
+
+ /**
+ * Returns the number of primary (on-disk) index entries between the index summary entry at `index` and the next
+ * index summary entry (assuming there is one). Without any downsampling, this will always be equivalent to
+ * the index interval.
+ *
+ * @param index the index of an index summary entry (between zero and the index entry size)
+ *
+ * @return the number of partitions after `index` until the next partition with a summary entry
+ */
+ public int getEffectiveIndexIntervalAfterIndex(int index)
+ {
+ return Downsampling.getEffectiveIndexIntervalAfterIndex(index, samplingLevel, indexInterval);
}
public static class IndexSummarySerializer
{
- public void serialize(IndexSummary t, DataOutputStream out) throws IOException
+ public void serialize(IndexSummary t, DataOutputStream out, boolean withSamplingLevel) throws IOException
{
out.writeInt(t.indexInterval);
- out.writeInt(t.summary_size);
+ out.writeInt(t.summarySize);
out.writeLong(t.bytes.size());
+ if (withSamplingLevel)
+ {
+ out.writeInt(t.samplingLevel);
+ out.writeInt(t.sizeAtFullSampling);
+ }
FBUtilities.copy(new MemoryInputStream(t.bytes), out, t.bytes.size());
}
- public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner) throws IOException
+ public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel) throws IOException
{
int indexInterval = in.readInt();
- int summary_size = in.readInt();
- long offheap_size = in.readLong();
- Memory memory = Memory.allocate(offheap_size);
- FBUtilities.copy(in, new MemoryOutputStream(memory), offheap_size);
- return new IndexSummary(partitioner, memory, summary_size, indexInterval);
+ int summarySize = in.readInt();
+ long offheapSize = in.readLong();
+ int samplingLevel, fullSamplingSummarySize;
+ if (haveSamplingLevel)
+ {
+ samplingLevel = in.readInt();
+ fullSamplingSummarySize = in.readInt();
+ }
+ else
+ {
+ samplingLevel = BASE_SAMPLING_LEVEL;
+ fullSamplingSummarySize = summarySize;
+ }
+ Memory memory = Memory.allocate(offheapSize);
+ FBUtilities.copy(in, new MemoryOutputStream(memory), offheapSize);
+ return new IndexSummary(partitioner, memory, summarySize, fullSamplingSummarySize, indexInterval, samplingLevel);
}
}
@@ -134,4 +229,4 @@ public class IndexSummary implements Closeable
{
bytes.free();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index e7b9e11..3635e7e 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.io.sstable;
-import java.util.ArrayList;
+import java.util.*;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.TypeSizes;
@@ -27,6 +27,9 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+import static org.apache.cassandra.io.sstable.Downsampling.MIN_SAMPLING_LEVEL;
+
public class IndexSummaryBuilder
{
private static final Logger logger = LoggerFactory.getLogger(IndexSummaryBuilder.class);
@@ -34,12 +37,18 @@ public class IndexSummaryBuilder
private final ArrayList<Long> positions;
private final ArrayList<byte[]> keys;
private final int indexInterval;
+ private final int samplingLevel;
+ private final int[] startPoints;
private long keysWritten = 0;
+ private long indexIntervalMatches = 0;
private long offheapSize = 0;
- public IndexSummaryBuilder(long expectedKeys, int indexInterval)
+ public IndexSummaryBuilder(long expectedKeys, int indexInterval, int samplingLevel)
{
this.indexInterval = indexInterval;
+ this.samplingLevel = samplingLevel;
+ this.startPoints = Downsampling.getStartPoints(BASE_SAMPLING_LEVEL, samplingLevel);
+
long expectedEntries = expectedKeys / indexInterval;
if (expectedEntries > Integer.MAX_VALUE)
{
@@ -50,19 +59,39 @@ public class IndexSummaryBuilder
logger.warn("Index interval of {} is too low for {} expected keys; using interval of {} instead",
indexInterval, expectedKeys, effectiveInterval);
}
- positions = new ArrayList<Long>((int)expectedEntries);
- keys = new ArrayList<byte[]>((int)expectedEntries);
+
+ // adjust our estimates based on the sampling level
+ expectedEntries = (expectedEntries * samplingLevel) / BASE_SAMPLING_LEVEL;
+
+ positions = new ArrayList<>((int)expectedEntries);
+ keys = new ArrayList<>((int)expectedEntries);
}
public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexPosition)
{
if (keysWritten % indexInterval == 0)
{
- byte[] key = ByteBufferUtil.getArray(decoratedKey.key);
- keys.add(key);
- offheapSize += key.length;
- positions.add(indexPosition);
- offheapSize += TypeSizes.NATIVE.sizeof(indexPosition);
+ indexIntervalMatches++;
+
+ // see if we should skip this key based on our sampling level
+ boolean shouldSkip = false;
+ for (int start : startPoints)
+ {
+ if ((indexIntervalMatches - start) % BASE_SAMPLING_LEVEL == 0)
+ {
+ shouldSkip = true;
+ break;
+ }
+ }
+
+ if (!shouldSkip)
+ {
+ byte[] key = ByteBufferUtil.getArray(decoratedKey.key);
+ keys.add(key);
+ offheapSize += key.length;
+ positions.add(indexPosition);
+ offheapSize += TypeSizes.NATIVE.sizeof(indexPosition);
+ }
}
keysWritten++;
@@ -74,21 +103,110 @@ public class IndexSummaryBuilder
assert keys.size() > 0;
assert keys.size() == positions.size();
+ // first we write out the position in the *summary* for each key in the summary,
+ // then we write out (key, actual index position) pairs
Memory memory = Memory.allocate(offheapSize + (keys.size() * 4));
int idxPosition = 0;
int keyPosition = keys.size() * 4;
for (int i = 0; i < keys.size(); i++)
{
+ // write the position of the actual entry in the index summary (4 bytes)
+ memory.setInt(idxPosition, keyPosition);
+ idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
+
+ // write the key
+ byte[] keyBytes = keys.get(i);
+ memory.setBytes(keyPosition, keyBytes, 0, keyBytes.length);
+ keyPosition += keyBytes.length;
+
+ // write the position in the actual index file
+ long actualIndexPosition = positions.get(i);
+ memory.setLong(keyPosition, actualIndexPosition);
+ keyPosition += TypeSizes.NATIVE.sizeof(actualIndexPosition);
+ }
+ int sizeAtFullSampling = (int) Math.ceil(keysWritten / (double) indexInterval);
+ return new IndexSummary(partitioner, memory, keys.size(), sizeAtFullSampling, indexInterval, samplingLevel);
+ }
+
+ public static int entriesAtSamplingLevel(int samplingLevel, int maxSummarySize)
+ {
+ return (samplingLevel * maxSummarySize) / BASE_SAMPLING_LEVEL;
+ }
+
+ public static int calculateSamplingLevel(int currentSamplingLevel, int currentNumEntries, long targetNumEntries)
+ {
+ // Algebraic explanation for calculating the new sampling level (solve for newSamplingLevel):
+ // originalNumEntries = (baseSamplingLevel / currentSamplingLevel) * currentNumEntries
+ // newSpaceUsed = (newSamplingLevel / baseSamplingLevel) * originalNumEntries
+ // newSpaceUsed = (newSamplingLevel / baseSamplingLevel) * (baseSamplingLevel / currentSamplingLevel) * currentNumEntries
+ // newSpaceUsed = (newSamplingLevel / currentSamplingLevel) * currentNumEntries
+ // (newSpaceUsed * currentSamplingLevel) / currentNumEntries = newSamplingLevel
+ int newSamplingLevel = (int) (targetNumEntries * currentSamplingLevel) / currentNumEntries;
+ return Math.min(BASE_SAMPLING_LEVEL, Math.max(MIN_SAMPLING_LEVEL, newSamplingLevel));
+ }
+
+ /**
+ * Downsamples an existing index summary to a new sampling level.
+ * @param existing an existing IndexSummary
+ * @param newSamplingLevel the target level for the new IndexSummary. This must be less than the current sampling
+ * level for `existing`.
+ * @param partitioner the partitioner used for the index summary
+ * @return a new IndexSummary
+ */
+ public static IndexSummary downsample(IndexSummary existing, int newSamplingLevel, IPartitioner partitioner)
+ {
+ // To downsample the old index summary, we'll go through (potentially) several rounds of downsampling.
+ // Conceptually, each round starts at position X and then removes every Nth item. The value of X follows
+ // a particular pattern to evenly space out the items that we remove. The value of N decreases by one each
+ // round.
+
+ int currentSamplingLevel = existing.getSamplingLevel();
+ assert currentSamplingLevel > newSamplingLevel;
+
+ // calculate starting indexes for downsampling rounds
+ int[] startPoints = Downsampling.getStartPoints(currentSamplingLevel, newSamplingLevel);
+
+ // calculate new off-heap size
+ int removedKeyCount = 0;
+ long newOffHeapSize = existing.getOffHeapSize();
+ for (int start : startPoints)
+ {
+ for (int j = start; j < existing.size(); j += currentSamplingLevel)
+ {
+ removedKeyCount++;
+ newOffHeapSize -= existing.getEntry(j).length;
+ }
+ }
+
+ int newKeyCount = existing.size() - removedKeyCount;
+
+ // Subtract (removedKeyCount * 4) from the new size to account for fewer entries in the first section, which
+ // stores the position of the actual entries in the summary.
+ Memory memory = Memory.allocate(newOffHeapSize - (removedKeyCount * 4));
+
+ // Copy old entries to our new Memory.
+ int idxPosition = 0;
+ int keyPosition = newKeyCount * 4;
+ outer:
+ for (int oldSummaryIndex = 0; oldSummaryIndex < existing.size(); oldSummaryIndex++)
+ {
+ // to determine if we can skip this entry, go through the starting points for our downsampling rounds
+ // and see if the entry's index is covered by that round
+ for (int start : startPoints)
+ {
+ if ((oldSummaryIndex - start) % currentSamplingLevel == 0)
+ continue outer;
+ }
+
+ // write the position of the actual entry in the index summary (4 bytes)
memory.setInt(idxPosition, keyPosition);
idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
- byte[] temp = keys.get(i);
- memory.setBytes(keyPosition, temp, 0, temp.length);
- keyPosition += temp.length;
- long tempPosition = positions.get(i);
- memory.setLong(keyPosition, tempPosition);
- keyPosition += TypeSizes.NATIVE.sizeof(tempPosition);
+ // write the entry itself
+ byte[] entry = existing.getEntry(oldSummaryIndex);
+ memory.setBytes(keyPosition, entry, 0, entry.length);
+ keyPosition += entry.length;
}
- return new IndexSummary(partitioner, memory, keys.size(), indexInterval);
+ return new IndexSummary(partitioner, memory, newKeyCount, existing.getMaxNumberOfEntries(), existing.getIndexInterval(), newSamplingLevel);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
new file mode 100644
index 0000000..1b36baf
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.*;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+/**
+ * Manages the fixed-size memory pool for index summaries, periodically resizing them
+ * in order to give more memory to hot sstables and less memory to cold sstables.
+ */
+public class IndexSummaryManager implements IndexSummaryManagerMBean
+{
+ private static final Logger logger = LoggerFactory.getLogger(IndexSummaryManager.class);
+ public static final String MBEAN_NAME = "org.apache.cassandra.db:type=IndexSummaries";
+ public static final IndexSummaryManager instance;
+
+ private int resizeIntervalInMinutes = 0;
+ private long memoryPoolBytes;
+
+ // The target (or ideal) number of index summary entries must differ from the actual number of
+ // entries by this ratio in order to trigger an upsample or downsample of the summary. Because
+ // upsampling requires reading the primary index in order to rebuild the summary, the threshold
+ // for upsampling is is higher.
+ static final double UPSAMPLE_THRESHOLD = 1.5;
+ static final double DOWNSAMPLE_THESHOLD = 0.75;
+
+ private final DebuggableScheduledThreadPoolExecutor executor;
+
+ // our next scheduled resizing run
+ private ScheduledFuture future;
+
+ static
+ {
+ instance = new IndexSummaryManager();
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+
+ try
+ {
+ mbs.registerMBean(instance, new ObjectName(MBEAN_NAME));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private IndexSummaryManager()
+ {
+ executor = new DebuggableScheduledThreadPoolExecutor(1, "IndexSummaryManager", Thread.MIN_PRIORITY);
+
+ long indexSummarySizeInMB = DatabaseDescriptor.getIndexSummaryCapacityInMB();
+ int interval = DatabaseDescriptor.getIndexSummaryResizeIntervalInMinutes();
+ logger.info("Initializing index summary manager with a memory pool size of {} MB and a resize interval of {} minutes",
+ indexSummarySizeInMB, interval);
+
+ setMemoryPoolCapacityInMB(DatabaseDescriptor.getIndexSummaryCapacityInMB());
+ setResizeIntervalInMinutes(DatabaseDescriptor.getIndexSummaryResizeIntervalInMinutes());
+ }
+
+ public int getResizeIntervalInMinutes()
+ {
+ return resizeIntervalInMinutes;
+ }
+
+ public void setResizeIntervalInMinutes(int resizeIntervalInMinutes)
+ {
+ int oldInterval = this.resizeIntervalInMinutes;
+ this.resizeIntervalInMinutes = resizeIntervalInMinutes;
+
+ long initialDelay;
+ if (future != null)
+ {
+ initialDelay = oldInterval < 0
+ ? resizeIntervalInMinutes
+ : Math.max(0, resizeIntervalInMinutes - (oldInterval - future.getDelay(TimeUnit.MINUTES)));
+ future.cancel(false);
+ }
+ else
+ {
+ initialDelay = resizeIntervalInMinutes;
+ }
+
+ if (this.resizeIntervalInMinutes < 0)
+ {
+ future = null;
+ return;
+ }
+
+ future = executor.scheduleWithFixedDelay(new WrappedRunnable()
+ {
+ protected void runMayThrow() throws Exception
+ {
+ redistributeSummaries();
+ }
+ }, initialDelay, resizeIntervalInMinutes, TimeUnit.MINUTES);
+ }
+
+ // for testing only
+ @VisibleForTesting
+ Long getTimeToNextResize(TimeUnit timeUnit)
+ {
+ if (future == null)
+ return null;
+
+ return future.getDelay(timeUnit);
+ }
+
+ public long getMemoryPoolCapacityInMB()
+ {
+ return memoryPoolBytes / 1024L / 1024L;
+ }
+
+ public Map<String, Double> getSamplingRatios()
+ {
+ List<SSTableReader> sstables = getAllSSTables();
+ Map<String, Double> ratios = new HashMap<>(sstables.size());
+ for (SSTableReader sstable : sstables)
+ ratios.put(sstable.getFilename(), sstable.getIndexSummarySamplingLevel() / (double) Downsampling.BASE_SAMPLING_LEVEL);
+
+ return ratios;
+ }
+
+ public double getAverageSamplingRatio()
+ {
+ List<SSTableReader> sstables = getAllSSTables();
+ double total = 0.0;
+ for (SSTableReader sstable : sstables)
+ total += sstable.getIndexSummarySamplingLevel() / (double) Downsampling.BASE_SAMPLING_LEVEL;
+ return total / sstables.size();
+ }
+
+ public void setMemoryPoolCapacityInMB(long memoryPoolCapacityInMB)
+ {
+ this.memoryPoolBytes = memoryPoolCapacityInMB * 1024L * 1024L;
+ }
+
+ /**
+ * Returns the actual space consumed by index summaries for all sstables.
+ * @return space currently used in MB
+ */
+ public double getMemoryPoolSizeInMB()
+ {
+ long total = 0;
+ for (SSTableReader sstable : getAllSSTables())
+ total += sstable.getIndexSummaryOffHeapSize();
+ return total / 1024.0 / 1024.0;
+ }
+
+ private List<SSTableReader> getAllSSTables()
+ {
+ List<SSTableReader> result = new ArrayList<>();
+ for (Keyspace ks : Keyspace.all())
+ {
+ for (ColumnFamilyStore cfStore: ks.getColumnFamilyStores())
+ result.addAll(cfStore.getSSTables());
+ }
+
+ return result;
+ }
+
+ /**
+ * Returns a Pair of all compacting and non-compacting sstables. Non-compacting sstables will be marked as
+ * compacting.
+ */
+ private Pair<List<SSTableReader>, Multimap<DataTracker, SSTableReader>> getCompactingAndNonCompactingSSTables()
+ {
+ List<SSTableReader> allCompacting = new ArrayList<>();
+ Multimap<DataTracker, SSTableReader> allNonCompacting = HashMultimap.create();
+ for (Keyspace ks : Keyspace.all())
+ {
+ for (ColumnFamilyStore cfStore: ks.getColumnFamilyStores())
+ {
+ Set<SSTableReader> nonCompacting, allSSTables;
+ do
+ {
+ allSSTables = cfStore.getDataTracker().getSSTables();
+ nonCompacting = Sets.newHashSet(cfStore.getDataTracker().getUncompactingSSTables(allSSTables));
+ }
+ while (!(nonCompacting.isEmpty() || cfStore.getDataTracker().markCompacting(nonCompacting)));
+ allNonCompacting.putAll(cfStore.getDataTracker(), nonCompacting);
+ allCompacting.addAll(Sets.difference(allSSTables, nonCompacting));
+ }
+ }
+ return Pair.create(allCompacting, allNonCompacting);
+ }
+
+ public void redistributeSummaries() throws IOException
+ {
+ Pair<List<SSTableReader>, Multimap<DataTracker, SSTableReader>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
+ try
+ {
+ redistributeSummaries(compactingAndNonCompacting.left, Lists.newArrayList(compactingAndNonCompacting.right.values()), this.memoryPoolBytes);
+ }
+ finally
+ {
+ for(DataTracker tracker : compactingAndNonCompacting.right.keySet())
+ tracker.unmarkCompacting(compactingAndNonCompacting.right.get(tracker));
+ }
+ }
+
+ /**
+ * Attempts to fairly distribute a fixed pool of memory for index summaries across a set of SSTables based on
+ * their recent read rates.
+ * @param nonCompacting a list of sstables to share the memory pool across
+ * @param memoryPoolBytes a size (in bytes) that the total index summary space usage should stay close to or
+ * under, if possible
+ * @return a list of new SSTableReader instances
+ */
+ @VisibleForTesting
+ public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes) throws IOException
+ {
+ long total = 0;
+ for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting))
+ total += sstable.getIndexSummaryOffHeapSize();
+
+ logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
+ nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
+
+ double totalReadsPerSec = 0.0;
+ for (SSTableReader sstable : nonCompacting)
+ {
+ if (sstable.readMeter != null)
+ {
+ totalReadsPerSec += sstable.readMeter.fifteenMinuteRate();
+ }
+ }
+ logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
+
+ // copy and sort by read rates (ascending)
+ List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting);
+ Collections.sort(sstablesByHotness, new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ if (o1.readMeter == null && o2.readMeter == null)
+ return 0;
+ else if (o1.readMeter == null)
+ return -1;
+ else if (o2.readMeter == null)
+ return 1;
+ else
+ return Double.compare(o1.readMeter.fifteenMinuteRate(), o2.readMeter.fifteenMinuteRate());
+ }
+ });
+
+ long remainingBytes = memoryPoolBytes;
+ for (SSTableReader sstable : compacting)
+ remainingBytes -= sstable.getIndexSummaryOffHeapSize();
+
+ logger.trace("Index summaries for compacting SSTables are using {} MB of space",
+ (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
+ List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes);
+
+ total = 0;
+ for (SSTableReader sstable : Iterables.concat(compacting, newSSTables))
+ total += sstable.getIndexSummaryOffHeapSize();
+ logger.debug("Completed resizing of index summaries; current approximate memory used: {} MB",
+ total / 1024.0 / 1024.0);
+
+ return newSSTables;
+ }
+
+ private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables,
+ double totalReadsPerSec, long memoryPoolCapacity) throws IOException
+ {
+
+ List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4);
+ List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
+ List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
+
+ // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional
+ // to the number of total reads/sec it handles.
+ long remainingSpace = memoryPoolCapacity;
+ for (SSTableReader sstable : sstables)
+ {
+ double readsPerSec = sstable.readMeter == null ? 0.0 : sstable.readMeter.fifteenMinuteRate();
+ long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
+
+ // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that
+ int currentNumEntries = sstable.getIndexSummarySize();
+ double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
+ long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
+ int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
+ int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries);
+ int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize());
+
+ logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving from level " +
+ "{} ({} entries) to level {} ({} entries)",
+ sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries, newSamplingLevel, numEntriesAtNewSamplingLevel);
+
+ if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
+ {
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+ }
+ else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
+ {
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= spaceUsed;
+ }
+ else
+ {
+ // keep the same sampling level
+ logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
+ remainingSpace -= sstable.getIndexSummaryOffHeapSize();
+ newSSTables.add(sstable);
+ }
+ totalReadsPerSec -= readsPerSec;
+ }
+
+ if (remainingSpace > 0)
+ {
+ Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace);
+ toDownsample = result.right;
+ newSSTables.addAll(result.left);
+ }
+
+ // downsample first, then upsample
+ toDownsample.addAll(toUpsample);
+ Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create();
+ Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create();
+ for (ResampleEntry entry : toDownsample)
+ {
+ SSTableReader sstable = entry.sstable;
+ logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
+ sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
+ entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
+ SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(entry.newSamplingLevel);
+ DataTracker tracker = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName()).getDataTracker();
+
+ replacedByTracker.put(tracker, sstable);
+ replacementsByTracker.put(tracker, replacement);
+ }
+
+ for (DataTracker tracker : replacedByTracker.keySet())
+ {
+ tracker.replaceReaders(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
+ newSSTables.addAll(replacementsByTracker.get(tracker));
+ }
+
+ return newSSTables;
+ }
+
+ @VisibleForTesting
+ static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
+ {
+ // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations
+ // that will make little difference.
+ Collections.sort(toDownsample, new Comparator<ResampleEntry>()
+ {
+ public int compare(ResampleEntry o1, ResampleEntry o2)
+ {
+ return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
+ o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
+ }
+ });
+
+ int noDownsampleCutoff = 0;
+ List<SSTableReader> willNotDownsample = new ArrayList<>();
+ while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
+ {
+ ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
+
+ long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
+ // see if we have enough leftover space to keep the current sampling level
+ if (extraSpaceRequired <= remainingSpace)
+ {
+ logger.trace("Using leftover space to keep {} at the current sampling level ({})",
+ entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
+ willNotDownsample.add(entry.sstable);
+ remainingSpace -= extraSpaceRequired;
+ }
+ else
+ {
+ break;
+ }
+
+ noDownsampleCutoff++;
+ }
+ return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
+ }
+
+ private static class ResampleEntry
+ {
+ public final SSTableReader sstable;
+ public final long newSpaceUsed;
+ public final int newSamplingLevel;
+
+ public ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
+ {
+ this.sstable = sstable;
+ this.newSpaceUsed = newSpaceUsed;
+ this.newSamplingLevel = newSamplingLevel;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/IndexSummaryManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManagerMBean.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManagerMBean.java
new file mode 100644
index 0000000..1e115cb
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManagerMBean.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface IndexSummaryManagerMBean
+{
+ public long getMemoryPoolCapacityInMB();
+ public void setMemoryPoolCapacityInMB(long memoryPoolCapacityInMB);
+
+ /**
+ * Returns the current actual off-heap memory usage of the index summaries for all non-compacting sstables.
+ * @return The amount of memory used in MB.
+ */
+ public double getMemoryPoolSizeInMB();
+
+ /**
+ * Returns a map of SSTable filenames to their current sampling ratio, where 1.0 indicates that all of the
+ * original index summary entries have been retained and 0.5 indicates that half of the original entries have
+ * been discarded.
+ * @return A map of SSTable filenames to their sampling ratios.
+ */
+ public Map<String, Double> getSamplingRatios();
+
+ public double getAverageSamplingRatio();
+
+ public void redistributeSummaries() throws IOException;
+
+ public int getResizeIntervalInMinutes();
+ public void setResizeIntervalInMinutes(int resizeIntervalInMinutes);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 369e3ea..f82af1f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -119,6 +119,8 @@ public class SSTableReader extends SSTable implements Closeable
// but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
private final AtomicBoolean isCompacted = new AtomicBoolean(false);
private final AtomicBoolean isSuspect = new AtomicBoolean(false);
+ private final AtomicBoolean isReplaced = new AtomicBoolean(false);
+
private final SSTableDeletingTask deletingTask;
// not final since we need to be able to change level on a file.
private volatile SSTableMetadata sstableMetadata;
@@ -136,8 +138,8 @@ public class SSTableReader extends SSTable implements Closeable
for (SSTableReader sstable : sstables)
{
- int indexKeyCount = sstable.getKeySampleSize();
- count = count + (indexKeyCount + 1) * sstable.indexSummary.getIndexInterval();
+ // using getMaxIndexSummarySize() lets us ignore the current sampling level
+ count += (sstable.getMaxIndexSummarySize() + 1) * sstable.indexSummary.getSamplingLevel();
if (logger.isDebugEnabled())
logger.debug("index size for bloom filter calc for file : {} : {}", sstable.getFilename(), count);
}
@@ -192,7 +194,7 @@ public class SSTableReader extends SSTable implements Closeable
? new CompressedSegmentedFile.Builder()
: new BufferedSegmentedFile.Builder();
if (!sstable.loadSummary(ibuilder, dbuilder, sstable.metadata))
- sstable.buildSummary(false, ibuilder, dbuilder, false);
+ sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
@@ -412,11 +414,16 @@ public class SSTableReader extends SSTable implements Closeable
if (readMeterSyncFuture != null)
readMeterSyncFuture.cancel(false);
- // Force finalizing mmapping if necessary
- ifile.cleanup();
- dfile.cleanup();
- // close the BF so it can be opened later.
- bf.close();
+ // if this SSTR was replaced by a new SSTR with a different index summary, the two instances will share
+ // resources, so don't force unmapping, clear the FileCacheService entry, or close the BF
+ if (!isReplaced.get())
+ {
+ // Force finalizing mmapping if necessary
+ ifile.cleanup();
+ dfile.cleanup();
+ // close the BF so it can be opened later.
+ bf.close();
+ }
indexSummary.close();
}
@@ -483,7 +490,7 @@ public class SSTableReader extends SSTable implements Closeable
boolean summaryLoaded = loadSummary(ibuilder, dbuilder, metadata);
if (recreateBloomFilter || !summaryLoaded)
- buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded);
+ buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
@@ -491,7 +498,7 @@ public class SSTableReader extends SSTable implements Closeable
saveSummary(ibuilder, dbuilder);
}
- private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
+ private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
{
// we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
@@ -509,7 +516,7 @@ public class SSTableReader extends SSTable implements Closeable
IndexSummaryBuilder summaryBuilder = null;
if (!summaryLoaded)
- summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getIndexInterval());
+ summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getIndexInterval(), samplingLevel);
long indexPosition;
while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
@@ -555,7 +562,7 @@ public class SSTableReader extends SSTable implements Closeable
try
{
iStream = new DataInputStream(new FileInputStream(summariesFile));
- indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner);
+ indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel);
if (indexSummary.getIndexInterval() != metadata.getIndexInterval())
{
iStream.close();
@@ -587,15 +594,20 @@ public class SSTableReader extends SSTable implements Closeable
public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
{
+ saveSummary(ibuilder, dbuilder, indexSummary);
+ }
+
+ private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
+ {
File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
if (summariesFile.exists())
- summariesFile.delete();
+ FileUtils.deleteWithConfirm(summariesFile);
DataOutputStream oStream = null;
try
{
oStream = new DataOutputStream(new FileOutputStream(summariesFile));
- IndexSummary.serializer.serialize(indexSummary, oStream);
+ IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel);
ByteBufferUtil.writeWithLength(first.key, oStream);
ByteBufferUtil.writeWithLength(last.key, oStream);
ibuilder.serializeBounds(oStream);
@@ -607,7 +619,7 @@ public class SSTableReader extends SSTable implements Closeable
// corrupted hence delete it and let it load it now.
if (summariesFile.exists())
- summariesFile.delete();
+ FileUtils.deleteWithConfirm(summariesFile);
}
finally
{
@@ -615,6 +627,81 @@ public class SSTableReader extends SSTable implements Closeable
}
}
+ /**
+ * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
+ * be built at the target samplingLevel. This (original) SSTableReader instance will be marked as replaced, have
+ * its DeletingTask removed, and have its periodic read-meter sync task cancelled.
+ * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader
+ * @return a new SSTableReader
+ * @throws IOException
+ */
+ public SSTableReader cloneWithNewSummarySamplingLevel(int samplingLevel) throws IOException
+ {
+ IndexSummary newSummary;
+ if (samplingLevel < indexSummary.getSamplingLevel())
+ {
+ newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, partitioner);
+
+ SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+ SegmentedFile.Builder dbuilder = compression
+ ? SegmentedFile.getCompressedBuilder()
+ : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+ saveSummary(ibuilder, dbuilder, newSummary);
+ }
+ else if (samplingLevel > indexSummary.getSamplingLevel())
+ {
+ newSummary = upsampleSummary(samplingLevel);
+ }
+ else
+ {
+ throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level");
+ }
+
+ markReplaced();
+ if (readMeterSyncFuture != null)
+ readMeterSyncFuture.cancel(false);
+
+ SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata);
+ replacement.readMeter = this.readMeter;
+ replacement.first = this.first;
+ replacement.last = this.last;
+ return replacement;
+ }
+
+ private IndexSummary upsampleSummary(int newSamplingLevel) throws IOException
+ {
+ // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
+ RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+ try
+ {
+ long indexSize = primaryIndex.length();
+ IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getIndexInterval(), newSamplingLevel);
+
+ long indexPosition;
+ while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+ {
+ summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
+ RowIndexEntry.serializer.skip(primaryIndex);
+ }
+
+ return summaryBuilder.build(partitioner);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(primaryIndex);
+ }
+ }
+
+ public int getIndexSummarySamplingLevel()
+ {
+ return indexSummary.getSamplingLevel();
+ }
+
+ public long getIndexSummaryOffHeapSize()
+ {
+ return indexSummary.getOffHeapSize();
+ }
+
public void releaseSummary() throws IOException
{
indexSummary.close();
@@ -627,22 +714,37 @@ public class SSTableReader extends SSTable implements Closeable
throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
}
- /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
+ /**
+ * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away,
+ * modulo downsampling of the index summary).
+ */
public long getIndexScanPosition(RowPosition key)
{
- int index = indexSummary.binarySearch(key);
- if (index < 0)
+ return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
+ }
+
+ public static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
+ {
+ if (binarySearchResult == -1)
+ return -1;
+ else
+ return referencedIndexSummary.getPosition(getIndexSummaryIndexFromBinarySearchResult(binarySearchResult));
+ }
+
+ private static int getIndexSummaryIndexFromBinarySearchResult(int binarySearchResult)
+ {
+ if (binarySearchResult < 0)
{
// binary search gives us the first index _greater_ than the key searched for,
// i.e., its insertion position
- int greaterThan = (index + 1) * -1;
+ int greaterThan = (binarySearchResult + 1) * -1;
if (greaterThan == 0)
return -1;
- return indexSummary.getPosition(greaterThan - 1);
+ return greaterThan - 1;
}
else
{
- return indexSummary.getPosition(index);
+ return binarySearchResult;
}
}
@@ -681,7 +783,7 @@ public class SSTableReader extends SSTable implements Closeable
*/
public long estimatedKeys()
{
- return ((long) indexSummary.size()) * indexSummary.getIndexInterval();
+ return ((long) indexSummary.getMaxNumberOfEntries()) * indexSummary.getIndexInterval();
}
/**
@@ -694,20 +796,35 @@ public class SSTableReader extends SSTable implements Closeable
List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges);
for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1);
- return Math.max(1, sampleKeyCount * indexSummary.getIndexInterval());
+
+ // adjust for the current sampling level
+ long estimatedKeys = sampleKeyCount * (Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getIndexInterval()) / indexSummary.getSamplingLevel();
+ return Math.max(1, estimatedKeys);
}
/**
- * @return Approximately 1/INDEX_INTERVALth of the keys in this SSTable.
+ * Returns the number of entries in the IndexSummary. At full sampling, this is approximately 1/INDEX_INTERVALth of
+ * the keys in this SSTable.
*/
- public int getKeySampleSize()
+ public int getIndexSummarySize()
{
return indexSummary.size();
}
- public byte[] getKeySample(int position)
+ /**
+ * Returns the approximate number of entries the IndexSummary would contain if it were at full sampling.
+ */
+ public int getMaxIndexSummarySize()
{
- return indexSummary.getKey(position);
+ return indexSummary.getMaxNumberOfEntries();
+ }
+
+ /**
+ * Returns the key for the index summary entry at `index`.
+ */
+ public byte[] getIndexSummaryKey(int index)
+ {
+ return indexSummary.getKey(index);
}
private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
@@ -935,25 +1052,28 @@ public class SSTableReader extends SSTable implements Closeable
}
}
- // next, see if the sampled index says it's impossible for the key to be present
- long sampledPosition = getIndexScanPosition(key);
- if (sampledPosition == -1)
+ // check the smallest and greatest keys in the sstable to see if it can't be present
+ if (first.compareTo(key) > 0 || last.compareTo(key) < 0)
{
if (op == Operator.EQ && updateCacheAndStats)
bloomFilterTracker.addFalsePositive();
- // we matched the -1th position: if the operator might match forward, we'll start at the first
- // position. We however need to return the correct index entry for that first position.
- if (op.apply(1) >= 0)
- {
- sampledPosition = 0;
- }
- else
+
+ if (op.apply(1) < 0)
{
- Tracing.trace("Partition summary allows skipping sstable {}", descriptor.generation);
+ Tracing.trace("Check against min and max keys allows skipping sstable {}", descriptor.generation);
return null;
}
}
+ int binarySearchResult = indexSummary.binarySearch(key);
+ long sampledPosition = getIndexScanPositionFromBinarySearchResult(binarySearchResult, indexSummary);
+ int sampledIndex = getIndexSummaryIndexFromBinarySearchResult(binarySearchResult);
+
+ // if we matched the -1th position, we'll start at the first position
+ sampledPosition = sampledPosition == -1 ? 0 : sampledPosition;
+
+ int effectiveInterval = indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex);
+
// scan the on-disk index, starting at the nearest sampled position.
// The check against IndexInterval is to be exit the loop in the EQ case when the key looked for is not present
// (bloom filter false positive). But note that for non-EQ cases, we might need to check the first key of the
@@ -962,12 +1082,12 @@ public class SSTableReader extends SSTable implements Closeable
// of the next interval).
int i = 0;
Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
- while (segments.hasNext() && i <= indexSummary.getIndexInterval())
+ while (segments.hasNext() && i <= effectiveInterval)
{
FileDataInput in = segments.next();
try
{
- while (!in.isEOF() && i <= indexSummary.getIndexInterval())
+ while (!in.isEOF() && i <= effectiveInterval)
{
i++;
@@ -1102,6 +1222,12 @@ public class SSTableReader extends SSTable implements Closeable
return dfile.onDiskLength;
}
+ public void markReplaced()
+ {
+ boolean success = isReplaced.compareAndSet(false, true);
+ assert success : "Attempted to mark an SSTableReader as replaced more than once";
+ }
+
public boolean acquireReference()
{
while (true)
@@ -1121,19 +1247,27 @@ public class SSTableReader extends SSTable implements Closeable
*/
public void releaseReference()
{
- if (references.decrementAndGet() == 0 && isCompacted.get())
+ if (references.decrementAndGet() == 0)
{
- /**
- * Make OS a favour and suggest (using fadvice call) that we
- * don't want to see pages of this SSTable in memory anymore.
- *
- * NOTE: We can't use madvice in java because it requires address of
- * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
- */
- dropPageCache();
-
FileUtils.closeQuietly(this);
- deletingTask.schedule();
+
+ // if this SSTR instance was replaced by another with a different index summary, let the new instance
+ // handle clearing the page cache and deleting the files
+ if (isCompacted.get())
+ {
+ assert !isReplaced.get();
+
+ /**
+ * Do the OS a favour and suggest (using fadvice call) that we
+ * don't want to see pages of this SSTable in memory anymore.
+ *
+ * NOTE: We can't use madvice in java because it requires the address of
+ * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
+ */
+ dropPageCache();
+
+ deletingTask.schedule();
+ }
}
assert references.get() >= 0 : "Reference counter " + references.get() + " for " + dfile.path;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index b5d50cf..5b02d37 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -414,7 +414,7 @@ public class SSTableWriter extends SSTable
indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)),
!metadata.populateIoCacheOnFlush());
builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
- summary = new IndexSummaryBuilder(keyCount, metadata.getIndexInterval());
+ summary = new IndexSummaryBuilder(keyCount, metadata.getIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 3410e9b..5dabb42 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -487,10 +487,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
initialized = true;
isClientMode = false;
- // Ensure StorageProxy is initialized on start-up; see CASSANDRA-3797.
try
{
+ // Ensure StorageProxy is initialized on start-up; see CASSANDRA-3797.
Class.forName("org.apache.cassandra.service.StorageProxy");
+ // also IndexSummaryManager, which is otherwise unreferenced
+ Class.forName("org.apache.cassandra.io.sstable.IndexSummaryManager");
}
catch (ClassNotFoundException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index ef20e64..1c13942 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -202,7 +202,8 @@ public class SchemaLoader
.compactionStrategyOptions(leveledOptions),
standardCFMD(ks1, "legacyleveled")
.compactionStrategyClass(LeveledCompactionStrategy.class)
- .compactionStrategyOptions(leveledOptions)));
+ .compactionStrategyOptions(leveledOptions),
+ standardCFMD(ks1, "StandardLowIndexInterval").indexInterval(8).caching(CFMetaData.Caching.NONE)));
// Keyspace 2
schema.add(KSMetaData.testMetadata(ks2,