You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/01/28 14:04:46 UTC
cassandra git commit: Make sure we compact highly overlapping cold
sstables with STCS
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 325169e82 -> 9efa0173d
Make sure we compact highly overlapping cold sstables with STCS
Patch by marcuse; reviewed by carlyeks for CASSANDRA-8635
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9efa0173
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9efa0173
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9efa0173
Branch: refs/heads/cassandra-2.1
Commit: 9efa0173d0e621045f650e9a57a607d3c4c0bb50
Parents: 325169e
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Jan 28 13:49:42 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jan 28 13:53:27 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../SizeTieredCompactionStrategy.java | 77 +++++++++++++++++++-
.../cassandra/io/sstable/ColumnNameHelper.java | 22 ++++++
.../cassandra/io/sstable/SSTableReader.java | 50 +++++++++++++
.../SizeTieredCompactionStrategyTest.java | 12 +--
.../cassandra/db/filter/ColumnSliceTest.java | 14 ++++
6 files changed, 168 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b247127..ff6a26f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.1.3
+ * Make sure we compact highly overlapping cold sstables with
+ STCS (CASSANDRA-8635)
* rpc_interface and listen_interface generate NPE on startup when specified interface doesn't exist (CASSANDRA-8677)
* Fix ArrayIndexOutOfBoundsException in nodetool cfhistograms (CASSANDRA-8514)
* Switch from yammer metrics for nodetool cf/proxy histograms (CASSANDRA-8662)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 4b44426..fbd715c 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -24,12 +24,14 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cql3.statements.CFPropDefs;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.ColumnNameHelper;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.utils.Pair;
@@ -80,7 +82,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
int maxThreshold = cfs.getMaximumCompactionThreshold();
Iterable<SSTableReader> candidates = filterSuspectSSTables(Sets.intersection(cfs.getUncompactingSSTables(), sstables));
- candidates = filterColdSSTables(Lists.newArrayList(candidates), options.coldReadsToOmit);
+ candidates = filterColdSSTables(Lists.newArrayList(candidates), options.coldReadsToOmit, cfs.getMinimumCompactionThreshold());
List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), options.bucketHigh, options.bucketLow, options.minSSTableSize);
logger.debug("Compaction buckets are {}", buckets);
@@ -109,10 +111,11 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
* across all sstables
* @param sstables all sstables to consider
* @param coldReadsToOmit the proportion of total reads/sec that will be omitted (0=omit nothing, 1=omit everything)
+ * @param minThreshold min compaction threshold
* @return a list of sstables with the coldest sstables excluded until the reads they represent reaches coldReadsToOmit
*/
@VisibleForTesting
- static List<SSTableReader> filterColdSSTables(List<SSTableReader> sstables, double coldReadsToOmit)
+ static List<SSTableReader> filterColdSSTables(List<SSTableReader> sstables, double coldReadsToOmit, int minThreshold)
{
if (coldReadsToOmit == 0.0)
return sstables;
@@ -167,10 +170,78 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
totalColdReads += reads;
cutoffIndex++;
}
+ List<SSTableReader> hotSSTables = new ArrayList<>(sstables.subList(cutoffIndex, sstables.size()));
+ List<SSTableReader> coldSSTables = sstables.subList(0, cutoffIndex);
+ logger.debug("hotSSTables={}, coldSSTables={}", hotSSTables.size(), coldSSTables.size());
+ if (hotSSTables.size() >= minThreshold)
+ return hotSSTables;
+ if (coldSSTables.size() < minThreshold)
+ return Collections.emptyList();
+
+ Map<SSTableReader, Set<SSTableReader>> overlapMap = new HashMap<>();
+ for (int i = 0; i < coldSSTables.size(); i++)
+ {
+ SSTableReader sstable = coldSSTables.get(i);
+ Set<SSTableReader> overlaps = new HashSet<>();
+ for (int j = 0; j < coldSSTables.size(); j++)
+ {
+ SSTableReader innerSSTable = coldSSTables.get(j);
+ if (ColumnNameHelper.overlaps(sstable.getSSTableMetadata().minColumnNames,
+ sstable.getSSTableMetadata().maxColumnNames,
+ innerSSTable.getSSTableMetadata().minColumnNames,
+ innerSSTable.getSSTableMetadata().maxColumnNames,
+ sstable.metadata.comparator))
+ {
+ overlaps.add(innerSSTable);
+ }
+ }
+ overlapMap.put(sstable, overlaps);
+ }
+ List<Set<SSTableReader>> overlapChains = new ArrayList<>();
+ for (SSTableReader sstable : overlapMap.keySet())
+ overlapChains.add(createOverlapChain(sstable, overlapMap));
+
+ Collections.sort(overlapChains, new Comparator<Set<SSTableReader>>()
+ {
+ @Override
+ public int compare(Set<SSTableReader> o1, Set<SSTableReader> o2)
+ {
+ return Longs.compare(SSTableReader.getTotalBytes(o2), SSTableReader.getTotalBytes(o1));
+ }
+ });
+ for (Set<SSTableReader> overlapping : overlapChains)
+ {
+ // if we are expecting to only keep 70% of the keys after a compaction, run a compaction on these cold sstables:
+ if (SSTableReader.estimateCompactionGain(overlapping) < 0.7)
+ return new ArrayList<>(overlapping);
+ }
+ return Collections.emptyList();
+ }
- return sstables.subList(cutoffIndex, sstables.size());
+ /**
+ * returns a set with all overlapping sstables starting with s.
+ * if we have 3 sstables, a, b, c where a overlaps with b, but not c and b overlaps with c, all sstables would be returned.
+ *
+ * m contains an sstable -> all overlapping mapping
+ */
+ private static Set<SSTableReader> createOverlapChain(SSTableReader s, Map<SSTableReader, Set<SSTableReader>> m)
+ {
+ Deque<SSTableReader> sstables = new ArrayDeque<>();
+ Set<SSTableReader> overlapChain = new HashSet<>();
+ sstables.push(s);
+ while (!sstables.isEmpty())
+ {
+ SSTableReader sstable = sstables.pop();
+ if (overlapChain.add(sstable))
+ {
+ if (m.containsKey(sstable))
+ sstables.addAll(m.get(sstable));
+ }
+ }
+ return overlapChain;
}
+
/**
* @param buckets list of buckets from which to return the most interesting, where "interesting" is the total hotness for reads
* @param minThreshold minimum number of sstables in a bucket to qualify as interesting
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java b/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java
index f74b86f..436975b 100644
--- a/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java
+++ b/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java
@@ -217,4 +217,26 @@ public class ColumnNameHelper
return retList;
}
+
+ /**
+ * Checks if the given min/max column names could overlap (i.e they could share some column names based on the max/min column names in the sstables)
+ */
+ public static boolean overlaps(List<ByteBuffer> minColumnNames1, List<ByteBuffer> maxColumnNames1, List<ByteBuffer> minColumnNames2, List<ByteBuffer> maxColumnNames2, CellNameType comparator)
+ {
+ if (minColumnNames1.isEmpty() || maxColumnNames1.isEmpty() || minColumnNames2.isEmpty() || maxColumnNames2.isEmpty())
+ return true;
+
+ return !(compare(maxColumnNames1, minColumnNames2, comparator) < 0 || compare(minColumnNames1, maxColumnNames2, comparator) > 0);
+ }
+
+ private static int compare(List<ByteBuffer> columnNames1, List<ByteBuffer> columnNames2, CellNameType comparator)
+ {
+ for (int i = 0; i < Math.min(columnNames1.size(), columnNames2.size()); i++)
+ {
+ int cmp = comparator.subtype(i).compare(columnNames1.get(i), columnNames2.get(i));
+ if (cmp != 0)
+ return cmp;
+ }
+ return 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index ee8b7c3..50bf3e3 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -31,6 +31,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -54,6 +55,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.cache.InstrumentingCache;
@@ -272,6 +274,54 @@ public class SSTableReader extends SSTable
return count;
}
+ /**
+ * Estimates how much of the keys we would keep if the sstables were compacted together
+ */
+ public static double estimateCompactionGain(Set<SSTableReader> overlapping)
+ {
+ Set<ICardinality> cardinalities = new HashSet<>(overlapping.size());
+ for (SSTableReader sstable : overlapping)
+ {
+ try
+ {
+ ICardinality cardinality = ((CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION)).cardinalityEstimator;
+ if (cardinality != null)
+ cardinalities.add(cardinality);
+ else
+ logger.debug("Got a null cardinality estimator in: "+sstable.getFilename());
+ }
+ catch (IOException e)
+ {
+ logger.warn("Could not read up compaction metadata for " + sstable, e);
+ }
+ }
+ long totalKeyCountBefore = 0;
+ for (ICardinality cardinality : cardinalities)
+ {
+ totalKeyCountBefore += cardinality.cardinality();
+ }
+ if (totalKeyCountBefore == 0)
+ return 1;
+
+ long totalKeyCountAfter = mergeCardinalities(cardinalities).cardinality();
+ logger.debug("Estimated compaction gain: {}/{}={}", totalKeyCountAfter, totalKeyCountBefore, ((double)totalKeyCountAfter)/totalKeyCountBefore);
+ return ((double)totalKeyCountAfter)/totalKeyCountBefore;
+ }
+
+ private static ICardinality mergeCardinalities(Collection<ICardinality> cardinalities)
+ {
+ ICardinality base = new HyperLogLogPlus(13, 25); // see MetadataCollector.cardinality
+ try
+ {
+ base = base.merge(cardinalities.toArray(new ICardinality[cardinalities.size()]));
+ }
+ catch (CardinalityMergeException e)
+ {
+ logger.warn("Could not merge cardinalities", e);
+ }
+ return base;
+ }
+
public static SSTableReader open(Descriptor descriptor) throws IOException
{
CFMetaData metadata;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
index 6132dad..d9bf017 100644
--- a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
@@ -216,17 +216,17 @@ public class SizeTieredCompactionStrategyTest extends SchemaLoader
for (SSTableReader sstr : sstrs)
sstr.readMeter = null;
- filtered = filterColdSSTables(sstrs, 0.05);
+ filtered = filterColdSSTables(sstrs, 0.05, 0);
assertEquals("when there are no read meters, no sstables should be filtered", sstrs.size(), filtered.size());
for (SSTableReader sstr : sstrs)
sstr.readMeter = new RestorableMeter(0.0, 0.0);
- filtered = filterColdSSTables(sstrs, 0.05);
+ filtered = filterColdSSTables(sstrs, 0.05, 0);
assertEquals("when all read meters are zero, no sstables should be filtered", sstrs.size(), filtered.size());
// leave all read rates at 0 besides one
sstrs.get(0).readMeter = new RestorableMeter(1000.0, 1000.0);
- filtered = filterColdSSTables(sstrs, 0.05);
+ filtered = filterColdSSTables(sstrs, 0.05, 0);
assertEquals("there should only be one hot sstable", 1, filtered.size());
assertEquals(1000.0, filtered.get(0).readMeter.twoHourRate(), 0.5);
@@ -239,20 +239,20 @@ public class SizeTieredCompactionStrategyTest extends SchemaLoader
sstrs.get(2).readMeter = new RestorableMeter(1.0, 1.0);
sstrs.get(3).readMeter = new RestorableMeter(1.0, 1.0);
- filtered = filterColdSSTables(sstrs, 0.025);
+ filtered = filterColdSSTables(sstrs, 0.025, 0);
assertEquals(2, filtered.size());
assertEquals(98.0, filtered.get(0).readMeter.twoHourRate() + filtered.get(1).readMeter.twoHourRate(), 0.5);
// make sure a threshold of 0.0 doesn't result in any sstables being filtered
for (SSTableReader sstr : sstrs)
sstr.readMeter = new RestorableMeter(1.0, 1.0);
- filtered = filterColdSSTables(sstrs, 0.0);
+ filtered = filterColdSSTables(sstrs, 0.0, 0);
assertEquals(sstrs.size(), filtered.size());
// just for fun, set a threshold where all sstables are considered cold
for (SSTableReader sstr : sstrs)
sstr.readMeter = new RestorableMeter(1.0, 1.0);
- filtered = filterColdSSTables(sstrs, 1.0);
+ filtered = filterColdSSTables(sstrs, 1.0, 0);
assertTrue(filtered.isEmpty());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java b/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
index f1be21c..8ba2665 100644
--- a/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
@@ -27,6 +27,7 @@ import org.junit.Test;
import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.io.sstable.ColumnNameHelper;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.junit.Assert.*;
@@ -344,6 +345,19 @@ public class ColumnSliceTest
slice = new ColumnSlice(composite(0), composite(0, 1, 2));
assertFalse(slice.intersects(columnNames(1), columnNames(1, 2), nameType, false));
}
+ @Test
+ public void testColumnNameHelper()
+ {
+ List<AbstractType<?>> types = new ArrayList<>();
+ types.add(Int32Type.instance);
+ types.add(Int32Type.instance);
+ types.add(Int32Type.instance);
+ CompoundDenseCellNameType nameType = new CompoundDenseCellNameType(types);
+ assertTrue(ColumnNameHelper.overlaps(columnNames(0, 0, 0), columnNames(3, 3, 3), columnNames(1, 1, 1), columnNames(2, 2, 2), nameType));
+ assertFalse(ColumnNameHelper.overlaps(columnNames(0, 0, 0), columnNames(3, 3, 3), columnNames(4, 4, 4), columnNames(5, 5, 5), nameType));
+ assertFalse(ColumnNameHelper.overlaps(columnNames(0, 0, 0), columnNames(3, 3, 3), columnNames(3, 3, 4), columnNames(5, 5, 5), nameType));
+ assertTrue(ColumnNameHelper.overlaps(columnNames(0), columnNames(3, 3, 3), columnNames(1, 1), columnNames(5), nameType));
+ }
@Test
public void testDeoverlapSlices()