You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/01/28 14:04:46 UTC

cassandra git commit: Make sure we compact highly overlapping cold sstables with STCS

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 325169e82 -> 9efa0173d


Make sure we compact highly overlapping cold sstables with STCS

Patch by marcuse; reviewed by carlyeks for CASSANDRA-8635


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9efa0173
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9efa0173
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9efa0173

Branch: refs/heads/cassandra-2.1
Commit: 9efa0173d0e621045f650e9a57a607d3c4c0bb50
Parents: 325169e
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Jan 28 13:49:42 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jan 28 13:53:27 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../SizeTieredCompactionStrategy.java           | 77 +++++++++++++++++++-
 .../cassandra/io/sstable/ColumnNameHelper.java  | 22 ++++++
 .../cassandra/io/sstable/SSTableReader.java     | 50 +++++++++++++
 .../SizeTieredCompactionStrategyTest.java       | 12 +--
 .../cassandra/db/filter/ColumnSliceTest.java    | 14 ++++
 6 files changed, 168 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b247127..ff6a26f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.1.3
+ * Make sure we compact highly overlapping cold sstables with
+   STCS (CASSANDRA-8635)
  * rpc_interface and listen_interface generate NPE on startup when specified interface doesn't exist (CASSANDRA-8677)
  * Fix ArrayIndexOutOfBoundsException in nodetool cfhistograms (CASSANDRA-8514)
  * Switch from yammer metrics for nodetool cf/proxy histograms (CASSANDRA-8662)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 4b44426..fbd715c 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -24,12 +24,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.statements.CFPropDefs;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.ColumnNameHelper;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.Pair;
 
@@ -80,7 +82,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         int maxThreshold = cfs.getMaximumCompactionThreshold();
 
         Iterable<SSTableReader> candidates = filterSuspectSSTables(Sets.intersection(cfs.getUncompactingSSTables(), sstables));
-        candidates = filterColdSSTables(Lists.newArrayList(candidates), options.coldReadsToOmit);
+        candidates = filterColdSSTables(Lists.newArrayList(candidates), options.coldReadsToOmit, cfs.getMinimumCompactionThreshold());
 
         List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), options.bucketHigh, options.bucketLow, options.minSSTableSize);
         logger.debug("Compaction buckets are {}", buckets);
@@ -109,10 +111,11 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
      * across all sstables
      * @param sstables all sstables to consider
      * @param coldReadsToOmit the proportion of total reads/sec that will be omitted (0=omit nothing, 1=omit everything)
+     * @param minThreshold min compaction threshold
      * @return a list of sstables with the coldest sstables excluded until the reads they represent reaches coldReadsToOmit
      */
     @VisibleForTesting
-    static List<SSTableReader> filterColdSSTables(List<SSTableReader> sstables, double coldReadsToOmit)
+    static List<SSTableReader> filterColdSSTables(List<SSTableReader> sstables, double coldReadsToOmit, int minThreshold)
     {
         if (coldReadsToOmit == 0.0)
             return sstables;
@@ -167,10 +170,78 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
             totalColdReads += reads;
             cutoffIndex++;
         }
+        List<SSTableReader> hotSSTables = new ArrayList<>(sstables.subList(cutoffIndex, sstables.size()));
+        List<SSTableReader> coldSSTables = sstables.subList(0, cutoffIndex);
+        logger.debug("hotSSTables={}, coldSSTables={}", hotSSTables.size(), coldSSTables.size());
+        if (hotSSTables.size() >= minThreshold)
+            return hotSSTables;
+        if (coldSSTables.size() < minThreshold)
+            return Collections.emptyList();
+
+        Map<SSTableReader, Set<SSTableReader>> overlapMap = new HashMap<>();
+        for (int i = 0; i < coldSSTables.size(); i++)
+        {
+            SSTableReader sstable = coldSSTables.get(i);
+            Set<SSTableReader> overlaps = new HashSet<>();
+            for (int j = 0; j < coldSSTables.size(); j++)
+            {
+                SSTableReader innerSSTable = coldSSTables.get(j);
+                if (ColumnNameHelper.overlaps(sstable.getSSTableMetadata().minColumnNames,
+                                              sstable.getSSTableMetadata().maxColumnNames,
+                                              innerSSTable.getSSTableMetadata().minColumnNames,
+                                              innerSSTable.getSSTableMetadata().maxColumnNames,
+                                              sstable.metadata.comparator))
+                {
+                    overlaps.add(innerSSTable);
+                }
+            }
+            overlapMap.put(sstable, overlaps);
+        }
+        List<Set<SSTableReader>> overlapChains = new ArrayList<>();
+        for (SSTableReader sstable : overlapMap.keySet())
+            overlapChains.add(createOverlapChain(sstable, overlapMap));
+
+        Collections.sort(overlapChains, new Comparator<Set<SSTableReader>>()
+        {
+            @Override
+            public int compare(Set<SSTableReader> o1, Set<SSTableReader> o2)
+            {
+                return Longs.compare(SSTableReader.getTotalBytes(o2), SSTableReader.getTotalBytes(o1));
+            }
+        });
+        for (Set<SSTableReader> overlapping : overlapChains)
+        {
+            // if we are expecting to only keep 70% of the keys after a compaction, run a compaction on these cold sstables:
+            if (SSTableReader.estimateCompactionGain(overlapping) < 0.7)
+                return new ArrayList<>(overlapping);
+        }
+        return Collections.emptyList();
+    }
 
-        return sstables.subList(cutoffIndex, sstables.size());
+    /**
+     * returns a set with all overlapping sstables starting with s.
+     * if we have 3 sstables, a, b, c where a overlaps with b, but not c and b overlaps with c, all sstables would be returned.
+     *
+     * m contains an sstable -> all overlapping mapping
+     */
+    private static Set<SSTableReader> createOverlapChain(SSTableReader s, Map<SSTableReader, Set<SSTableReader>> m)
+    {
+        Deque<SSTableReader> sstables = new ArrayDeque<>();
+        Set<SSTableReader> overlapChain = new HashSet<>();
+        sstables.push(s);
+        while (!sstables.isEmpty())
+        {
+            SSTableReader sstable = sstables.pop();
+            if (overlapChain.add(sstable))
+            {
+                if (m.containsKey(sstable))
+                    sstables.addAll(m.get(sstable));
+            }
+        }
+        return overlapChain;
     }
 
+
     /**
      * @param buckets list of buckets from which to return the most interesting, where "interesting" is the total hotness for reads
      * @param minThreshold minimum number of sstables in a bucket to qualify as interesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java b/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java
index f74b86f..436975b 100644
--- a/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java
+++ b/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java
@@ -217,4 +217,26 @@ public class ColumnNameHelper
 
         return retList;
     }
+
+    /**
+     * Checks if the given min/max column names could overlap (i.e they could share some column names based on the max/min column names in the sstables)
+     */
+    public static boolean overlaps(List<ByteBuffer> minColumnNames1, List<ByteBuffer> maxColumnNames1, List<ByteBuffer> minColumnNames2, List<ByteBuffer> maxColumnNames2, CellNameType comparator)
+    {
+        if (minColumnNames1.isEmpty() || maxColumnNames1.isEmpty() || minColumnNames2.isEmpty() || maxColumnNames2.isEmpty())
+            return true;
+
+        return !(compare(maxColumnNames1, minColumnNames2, comparator) < 0 || compare(minColumnNames1, maxColumnNames2, comparator) > 0);
+    }
+
+    private static int compare(List<ByteBuffer> columnNames1, List<ByteBuffer> columnNames2, CellNameType comparator)
+    {
+        for (int i = 0; i < Math.min(columnNames1.size(), columnNames2.size()); i++)
+        {
+            int cmp = comparator.subtype(i).compare(columnNames1.get(i), columnNames2.get(i));
+            if (cmp != 0)
+                return cmp;
+        }
+        return 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index ee8b7c3..50bf3e3 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -31,6 +31,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -54,6 +55,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.clearspring.analytics.stream.cardinality.ICardinality;
 import org.apache.cassandra.cache.CachingOptions;
 import org.apache.cassandra.cache.InstrumentingCache;
@@ -272,6 +274,54 @@ public class SSTableReader extends SSTable
         return count;
     }
 
+    /**
+     * Estimates how much of the keys we would keep if the sstables were compacted together
+     */
+    public static double estimateCompactionGain(Set<SSTableReader> overlapping)
+    {
+        Set<ICardinality> cardinalities = new HashSet<>(overlapping.size());
+        for (SSTableReader sstable : overlapping)
+        {
+            try
+            {
+                ICardinality cardinality = ((CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION)).cardinalityEstimator;
+                if (cardinality != null)
+                    cardinalities.add(cardinality);
+                else
+                    logger.debug("Got a null cardinality estimator in: "+sstable.getFilename());
+            }
+            catch (IOException e)
+            {
+                logger.warn("Could not read up compaction metadata for " + sstable, e);
+            }
+        }
+        long totalKeyCountBefore = 0;
+        for (ICardinality cardinality : cardinalities)
+        {
+            totalKeyCountBefore += cardinality.cardinality();
+        }
+        if (totalKeyCountBefore == 0)
+            return 1;
+
+        long totalKeyCountAfter = mergeCardinalities(cardinalities).cardinality();
+        logger.debug("Estimated compaction gain: {}/{}={}", totalKeyCountAfter, totalKeyCountBefore, ((double)totalKeyCountAfter)/totalKeyCountBefore);
+        return ((double)totalKeyCountAfter)/totalKeyCountBefore;
+    }
+
+    private static ICardinality mergeCardinalities(Collection<ICardinality> cardinalities)
+    {
+        ICardinality base = new HyperLogLogPlus(13, 25); // see MetadataCollector.cardinality
+        try
+        {
+            base = base.merge(cardinalities.toArray(new ICardinality[cardinalities.size()]));
+        }
+        catch (CardinalityMergeException e)
+        {
+            logger.warn("Could not merge cardinalities", e);
+        }
+        return base;
+    }
+
     public static SSTableReader open(Descriptor descriptor) throws IOException
     {
         CFMetaData metadata;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
index 6132dad..d9bf017 100644
--- a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
@@ -216,17 +216,17 @@ public class SizeTieredCompactionStrategyTest extends SchemaLoader
 
         for (SSTableReader sstr : sstrs)
             sstr.readMeter = null;
-        filtered = filterColdSSTables(sstrs, 0.05);
+        filtered = filterColdSSTables(sstrs, 0.05, 0);
         assertEquals("when there are no read meters, no sstables should be filtered", sstrs.size(), filtered.size());
 
         for (SSTableReader sstr : sstrs)
             sstr.readMeter = new RestorableMeter(0.0, 0.0);
-        filtered = filterColdSSTables(sstrs, 0.05);
+        filtered = filterColdSSTables(sstrs, 0.05, 0);
         assertEquals("when all read meters are zero, no sstables should be filtered", sstrs.size(), filtered.size());
 
         // leave all read rates at 0 besides one
         sstrs.get(0).readMeter = new RestorableMeter(1000.0, 1000.0);
-        filtered = filterColdSSTables(sstrs, 0.05);
+        filtered = filterColdSSTables(sstrs, 0.05, 0);
         assertEquals("there should only be one hot sstable", 1, filtered.size());
         assertEquals(1000.0, filtered.get(0).readMeter.twoHourRate(), 0.5);
 
@@ -239,20 +239,20 @@ public class SizeTieredCompactionStrategyTest extends SchemaLoader
         sstrs.get(2).readMeter = new RestorableMeter(1.0, 1.0);
         sstrs.get(3).readMeter = new RestorableMeter(1.0, 1.0);
 
-        filtered = filterColdSSTables(sstrs, 0.025);
+        filtered = filterColdSSTables(sstrs, 0.025, 0);
         assertEquals(2, filtered.size());
         assertEquals(98.0, filtered.get(0).readMeter.twoHourRate() + filtered.get(1).readMeter.twoHourRate(), 0.5);
 
         // make sure a threshold of 0.0 doesn't result in any sstables being filtered
         for (SSTableReader sstr : sstrs)
             sstr.readMeter = new RestorableMeter(1.0, 1.0);
-        filtered = filterColdSSTables(sstrs, 0.0);
+        filtered = filterColdSSTables(sstrs, 0.0, 0);
         assertEquals(sstrs.size(), filtered.size());
 
         // just for fun, set a threshold where all sstables are considered cold
         for (SSTableReader sstr : sstrs)
             sstr.readMeter = new RestorableMeter(1.0, 1.0);
-        filtered = filterColdSSTables(sstrs, 1.0);
+        filtered = filterColdSSTables(sstrs, 1.0, 0);
         assertTrue(filtered.isEmpty());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9efa0173/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java b/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
index f1be21c..8ba2665 100644
--- a/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnSliceTest.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.io.sstable.ColumnNameHelper;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.junit.Assert.*;
@@ -344,6 +345,19 @@ public class ColumnSliceTest
         slice = new ColumnSlice(composite(0), composite(0, 1, 2));
         assertFalse(slice.intersects(columnNames(1), columnNames(1, 2), nameType, false));
     }
+    @Test
+    public void testColumnNameHelper()
+    {
+        List<AbstractType<?>> types = new ArrayList<>();
+        types.add(Int32Type.instance);
+        types.add(Int32Type.instance);
+        types.add(Int32Type.instance);
+        CompoundDenseCellNameType nameType = new CompoundDenseCellNameType(types);
+        assertTrue(ColumnNameHelper.overlaps(columnNames(0, 0, 0), columnNames(3, 3, 3), columnNames(1, 1, 1), columnNames(2, 2, 2), nameType));
+        assertFalse(ColumnNameHelper.overlaps(columnNames(0, 0, 0), columnNames(3, 3, 3), columnNames(4, 4, 4), columnNames(5, 5, 5), nameType));
+        assertFalse(ColumnNameHelper.overlaps(columnNames(0, 0, 0), columnNames(3, 3, 3), columnNames(3, 3, 4), columnNames(5, 5, 5), nameType));
+        assertTrue(ColumnNameHelper.overlaps(columnNames(0), columnNames(3, 3, 3), columnNames(1, 1), columnNames(5), nameType));
+    }
 
     @Test
     public void testDeoverlapSlices()