You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/12/11 17:45:16 UTC

[1/4] cassandra git commit: Allow cancellation of index summary redistribution

Repository: cassandra
Updated Branches:
  refs/heads/trunk 2cd18ef5a -> 3f79c5baa


Allow cancellation of index summary redistribution

Patch by Carl Yeksigian; reviewed by marcuse for CASSANDRA-8805


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fc7075a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fc7075a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fc7075a4

Branch: refs/heads/trunk
Commit: fc7075a41837301f3866333e0eb5c464715d888c
Parents: 95dab27
Author: Carl Yeksigian <ca...@apache.org>
Authored: Tue Dec 8 12:22:25 2015 -0500
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Dec 11 17:20:18 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/compaction/CompactionInfo.java |  14 +-
 .../db/compaction/CompactionManager.java        |  14 +
 .../cassandra/db/compaction/OperationType.java  |   3 +-
 .../io/sstable/IndexSummaryManager.java         | 265 +--------------
 .../io/sstable/IndexSummaryRedistribution.java  | 338 +++++++++++++++++++
 .../io/sstable/IndexSummaryManagerTest.java     |  69 +++-
 7 files changed, 435 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 46cda65..2ee8b07 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.13
+ * Allow cancellation of index summary redistribution (CASSANDRA-8805)
  * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
  * Fix Stress profile parsing on Windows (CASSANDRA-10808)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index d086eef..e88143e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -98,9 +98,17 @@ public final class CompactionInfo implements Serializable
     public String toString()
     {
         StringBuilder buff = new StringBuilder();
-        buff.append(getTaskType()).append('@').append(getId());
-        buff.append('(').append(getKeyspace()).append(", ").append(getColumnFamily());
-        buff.append(", ").append(getCompleted()).append('/').append(getTotal());
+        buff.append(getTaskType());
+        if (cfm != null)
+        {
+            buff.append('@').append(getId()).append('(');
+            buff.append(getKeyspace()).append(", ").append(getColumnFamily()).append(", ");
+        }
+        else
+        {
+            buff.append('(');
+        }
+        buff.append(getCompleted()).append('/').append(getTotal());
         return buff.append(')').append(unit).toString();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 2630ba2..9bddaf5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1222,6 +1222,20 @@ public class CompactionManager implements CompactionManagerMBean
         return executor.submit(runnable);
     }
 
+    public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws IOException
+    {
+        metrics.beginCompaction(redistribution);
+
+        try
+        {
+            return redistribution.redistributeSummaries();
+        }
+        finally
+        {
+            metrics.finishCompaction(redistribution);
+        }
+    }
+
     static int getDefaultGcBefore(ColumnFamilyStore cfs)
     {
         // 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 15d18f6..475b591 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -31,7 +31,8 @@ public enum OperationType
     /** Compaction for tombstone removal */
     TOMBSTONE_COMPACTION("Tombstone Compaction"),
     UNKNOWN("Unknown compaction type"),
-    ANTICOMPACTION("Anticompaction after repair");
+    ANTICOMPACTION("Anticompaction after repair"),
+    INDEX_SUMMARY("Index summary redistribution");
 
     private final String type;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 0c196ff..be5cc3c 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.io.sstable;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,7 +31,6 @@ import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
@@ -45,11 +42,10 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
-
 /**
  * Manages the fixed-size memory pool for index summaries, periodically resizing them
  * in order to give more memory to hot sstables and less memory to cold sstables.
@@ -255,261 +251,6 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
     @VisibleForTesting
     public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes) throws IOException
     {
-        long total = 0;
-        for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting))
-            total += sstable.getIndexSummaryOffHeapSize();
-
-        List<SSTableReader> oldFormatSSTables = new ArrayList<>();
-        for (SSTableReader sstable : nonCompacting)
-        {
-            // We can't change the sampling level of sstables with the old format, because the serialization format
-            // doesn't include the sampling level.  Leave this one as it is.  (See CASSANDRA-8993 for details.)
-            logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
-            if (!sstable.descriptor.version.hasSamplingLevel)
-                oldFormatSSTables.add(sstable);
-        }
-        nonCompacting.removeAll(oldFormatSSTables);
-
-        logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
-                     nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
-
-        final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size());
-        double totalReadsPerSec = 0.0;
-        for (SSTableReader sstable : nonCompacting)
-        {
-            if (sstable.getReadMeter() != null)
-            {
-                Double readRate = sstable.getReadMeter().fifteenMinuteRate();
-                totalReadsPerSec += readRate;
-                readRates.put(sstable, readRate);
-            }
-        }
-        logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
-
-        // copy and sort by read rates (ascending)
-        List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting);
-        Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
-
-        long remainingBytes = memoryPoolBytes;
-        for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
-            remainingBytes -= sstable.getIndexSummaryOffHeapSize();
-
-        logger.trace("Index summaries for compacting SSTables are using {} MB of space",
-                     (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
-        List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes);
-
-        total = 0;
-        for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
-            total += sstable.getIndexSummaryOffHeapSize();
-        logger.debug("Completed resizing of index summaries; current approximate memory used: {} MB",
-                     total / 1024.0 / 1024.0);
-
-        return newSSTables;
-    }
-
-    private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables,
-                                                            double totalReadsPerSec, long memoryPoolCapacity) throws IOException
-    {
-
-        List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4);
-        List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
-        List<ResampleEntry> forceResample = new ArrayList<>();
-        List<ResampleEntry> forceUpsample = new ArrayList<>();
-        List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
-
-        // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional
-        // to the number of total reads/sec it handles.
-        long remainingSpace = memoryPoolCapacity;
-        for (SSTableReader sstable : sstables)
-        {
-            int minIndexInterval = sstable.metadata.getMinIndexInterval();
-            int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
-
-            double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
-            long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
-
-            // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that
-            int currentNumEntries = sstable.getIndexSummarySize();
-            double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
-            long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
-            int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
-            int maxSummarySize = sstable.getMaxIndexSummarySize();
-
-            // if the min_index_interval changed, calculate what our current sampling level would be under the new min
-            if (sstable.getMinIndexInterval() != minIndexInterval)
-            {
-                int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval()));
-                maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval));
-                logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})",
-                             sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel);
-                currentSamplingLevel = effectiveSamplingLevel;
-            }
-
-            int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries,
-                    minIndexInterval, maxIndexInterval);
-            int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize);
-            double effectiveIndexInterval = sstable.getEffectiveIndexInterval();
-
-            logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " +
-                    "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)",
-                    sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries,
-                    currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
-                    numEntriesAtNewSamplingLevel * avgEntrySize);
-
-            if (effectiveIndexInterval < minIndexInterval)
-            {
-                // The min_index_interval was changed; re-sample to match it.
-                logger.debug("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})",
-                        sstable, effectiveIndexInterval, minIndexInterval);
-                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
-                forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
-                remainingSpace -= spaceUsed;
-            }
-            else if (effectiveIndexInterval > maxIndexInterval)
-            {
-                // The max_index_interval was lowered; force an upsample to the effective minimum sampling level
-                logger.debug("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})",
-                        sstable, effectiveIndexInterval, maxIndexInterval);
-                newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval);
-                numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize());
-                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
-                forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
-                remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
-            }
-            else if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
-            {
-                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
-                toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
-                remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
-            }
-            else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
-            {
-                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
-                toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
-                remainingSpace -= spaceUsed;
-            }
-            else
-            {
-                // keep the same sampling level
-                logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
-                remainingSpace -= sstable.getIndexSummaryOffHeapSize();
-                newSSTables.add(sstable);
-            }
-            totalReadsPerSec -= readsPerSec;
-        }
-
-        if (remainingSpace > 0)
-        {
-            Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace);
-            toDownsample = result.right;
-            newSSTables.addAll(result.left);
-        }
-
-        // downsample first, then upsample
-        toDownsample.addAll(forceResample);
-        toDownsample.addAll(toUpsample);
-        toDownsample.addAll(forceUpsample);
-        Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create();
-        Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create();
-        for (ResampleEntry entry : toDownsample)
-        {
-            SSTableReader sstable = entry.sstable;
-            logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
-                         sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
-                         entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
-            ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName());
-            SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
-            DataTracker tracker = cfs.getDataTracker();
-
-            replacedByTracker.put(tracker, sstable);
-            replacementsByTracker.put(tracker, replacement);
-        }
-
-        for (DataTracker tracker : replacedByTracker.keySet())
-        {
-            tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
-            newSSTables.addAll(replacementsByTracker.get(tracker));
-        }
-
-        return newSSTables;
-    }
-
-    @VisibleForTesting
-    static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
-    {
-        // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations
-        // that will make little difference.
-        Collections.sort(toDownsample, new Comparator<ResampleEntry>()
-        {
-            public int compare(ResampleEntry o1, ResampleEntry o2)
-            {
-                return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
-                                      o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
-            }
-        });
-
-        int noDownsampleCutoff = 0;
-        List<SSTableReader> willNotDownsample = new ArrayList<>();
-        while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
-        {
-            ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
-
-            long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
-            // see if we have enough leftover space to keep the current sampling level
-            if (extraSpaceRequired <= remainingSpace)
-            {
-                logger.trace("Using leftover space to keep {} at the current sampling level ({})",
-                             entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
-                willNotDownsample.add(entry.sstable);
-                remainingSpace -= extraSpaceRequired;
-            }
-            else
-            {
-                break;
-            }
-
-            noDownsampleCutoff++;
-        }
-        return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
-    }
-
-    private static class ResampleEntry
-    {
-        public final SSTableReader sstable;
-        public final long newSpaceUsed;
-        public final int newSamplingLevel;
-
-        public ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
-        {
-            this.sstable = sstable;
-            this.newSpaceUsed = newSpaceUsed;
-            this.newSamplingLevel = newSamplingLevel;
-        }
-    }
-
-    /** Utility class for sorting sstables by their read rates. */
-    private static class ReadRateComparator implements Comparator<SSTableReader>
-    {
-        private final Map<SSTableReader, Double> readRates;
-
-        public ReadRateComparator(Map<SSTableReader, Double> readRates)
-        {
-            this.readRates = readRates;
-        }
-
-        @Override
-        public int compare(SSTableReader o1, SSTableReader o2)
-        {
-            Double readRate1 = readRates.get(o1);
-            Double readRate2 = readRates.get(o2);
-            if (readRate1 == null && readRate2 == null)
-                return 0;
-            else if (readRate1 == null)
-                return -1;
-            else if (readRate2 == null)
-                return 1;
-            else
-                return Double.compare(readRate1, readRate2);
-        }
+        return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, nonCompacting, memoryPoolBytes));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
new file mode 100644
index 0000000..adb3e4e
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+
+public class IndexSummaryRedistribution extends CompactionInfo.Holder
+{
+    private static final Logger logger = LoggerFactory.getLogger(IndexSummaryRedistribution.class);
+
+    private final List<SSTableReader> compacting;
+    private final List<SSTableReader> nonCompacting;
+    private final long memoryPoolBytes;
+    private volatile long remainingSpace;
+
+    public IndexSummaryRedistribution(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes)
+    {
+        this.compacting = compacting;
+        this.nonCompacting = nonCompacting;
+        this.memoryPoolBytes = memoryPoolBytes;
+    }
+
+    public List<SSTableReader> redistributeSummaries() throws IOException
+    {
+        long total = 0;
+        for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting))
+            total += sstable.getIndexSummaryOffHeapSize();
+
+        List<SSTableReader> oldFormatSSTables = new ArrayList<>();
+        for (SSTableReader sstable : nonCompacting)
+        {
+            // We can't change the sampling level of sstables with the old format, because the serialization format
+            // doesn't include the sampling level.  Leave this one as it is.  (See CASSANDRA-8993 for details.)
+            logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
+            if (!sstable.descriptor.version.hasSamplingLevel)
+                oldFormatSSTables.add(sstable);
+        }
+        nonCompacting.removeAll(oldFormatSSTables);
+
+        logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
+                     nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
+
+        final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size());
+        double totalReadsPerSec = 0.0;
+        for (SSTableReader sstable : nonCompacting)
+        {
+            if (isStopRequested())
+                throw new CompactionInterruptedException(getCompactionInfo());
+
+            if (sstable.getReadMeter() != null)
+            {
+                Double readRate = sstable.getReadMeter().fifteenMinuteRate();
+                totalReadsPerSec += readRate;
+                readRates.put(sstable, readRate);
+            }
+        }
+        logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
+
+        // copy and sort by read rates (ascending)
+        List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting);
+        Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
+
+        long remainingBytes = memoryPoolBytes;
+        for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
+            remainingBytes -= sstable.getIndexSummaryOffHeapSize();
+
+        logger.trace("Index summaries for compacting SSTables are using {} MB of space",
+                     (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
+        List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes);
+
+        total = 0;
+        for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
+            total += sstable.getIndexSummaryOffHeapSize();
+        logger.debug("Completed resizing of index summaries; current approximate memory used: {} MB",
+                     total / 1024.0 / 1024.0);
+
+        return newSSTables;
+    }
+
+    private List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables,
+                                                     double totalReadsPerSec, long memoryPoolCapacity) throws IOException
+    {
+
+        List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4);
+        List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
+        List<ResampleEntry> forceResample = new ArrayList<>();
+        List<ResampleEntry> forceUpsample = new ArrayList<>();
+        List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
+
+        // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional
+        // to the number of total reads/sec it handles.
+        remainingSpace = memoryPoolCapacity;
+        for (SSTableReader sstable : sstables)
+        {
+            if (isStopRequested())
+                throw new CompactionInterruptedException(getCompactionInfo());
+
+            int minIndexInterval = sstable.metadata.getMinIndexInterval();
+            int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
+
+            double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
+            long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
+
+            // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that
+            int currentNumEntries = sstable.getIndexSummarySize();
+            double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
+            long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
+            int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
+            int maxSummarySize = sstable.getMaxIndexSummarySize();
+
+            // if the min_index_interval changed, calculate what our current sampling level would be under the new min
+            if (sstable.getMinIndexInterval() != minIndexInterval)
+            {
+                int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval()));
+                maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval));
+                logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})",
+                             sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel);
+                currentSamplingLevel = effectiveSamplingLevel;
+            }
+
+            int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries,
+                                                                              minIndexInterval, maxIndexInterval);
+            int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize);
+            double effectiveIndexInterval = sstable.getEffectiveIndexInterval();
+
+            logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " +
+                         "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)",
+                         sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries,
+                         currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
+                         numEntriesAtNewSamplingLevel * avgEntrySize);
+
+            if (effectiveIndexInterval < minIndexInterval)
+            {
+                // The min_index_interval was changed; re-sample to match it.
+                logger.debug("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})",
+                             sstable, effectiveIndexInterval, minIndexInterval);
+                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                remainingSpace -= spaceUsed;
+            }
+            else if (effectiveIndexInterval > maxIndexInterval)
+            {
+                // The max_index_interval was lowered; force an upsample to the effective minimum sampling level
+                logger.debug("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})",
+                             sstable, effectiveIndexInterval, maxIndexInterval);
+                newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval);
+                numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize());
+                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+            }
+            else if (targetNumEntries >= currentNumEntries * IndexSummaryManager.UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
+            {
+                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+            }
+            else if (targetNumEntries < currentNumEntries * IndexSummaryManager.DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
+            {
+                long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                remainingSpace -= spaceUsed;
+            }
+            else
+            {
+                // keep the same sampling level
+                logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
+                remainingSpace -= sstable.getIndexSummaryOffHeapSize();
+                newSSTables.add(sstable);
+            }
+            totalReadsPerSec -= readsPerSec;
+        }
+
+        if (remainingSpace > 0)
+        {
+            Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace);
+            toDownsample = result.right;
+            newSSTables.addAll(result.left);
+        }
+
+        // downsample first, then upsample
+        toDownsample.addAll(forceResample);
+        toDownsample.addAll(toUpsample);
+        toDownsample.addAll(forceUpsample);
+        Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create();
+        Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create();
+
+        try
+        {
+            for (ResampleEntry entry : toDownsample)
+            {
+                if (isStopRequested())
+                    throw new CompactionInterruptedException(getCompactionInfo());
+
+                SSTableReader sstable = entry.sstable;
+                logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
+                             sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
+                             entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
+                ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName());
+                DataTracker tracker = cfs.getDataTracker();
+                SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
+                newSSTables.add(replacement);
+                replacedByTracker.put(tracker, sstable);
+                replacementsByTracker.put(tracker, replacement);
+            }
+        }
+        finally
+        {
+            for (DataTracker tracker : replacedByTracker.keySet())
+                tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
+        }
+
+        return newSSTables;
+    }
+
+    @VisibleForTesting
+    static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
+    {
+        // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations
+        // that will make little difference.
+        Collections.sort(toDownsample, new Comparator<ResampleEntry>()
+        {
+            public int compare(ResampleEntry o1, ResampleEntry o2)
+            {
+                return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
+                                      o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
+            }
+        });
+
+        int noDownsampleCutoff = 0;
+        List<SSTableReader> willNotDownsample = new ArrayList<>();
+        while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
+        {
+            ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
+
+            long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
+            // see if we have enough leftover space to keep the current sampling level
+            if (extraSpaceRequired <= remainingSpace)
+            {
+                logger.trace("Using leftover space to keep {} at the current sampling level ({})",
+                             entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
+                willNotDownsample.add(entry.sstable);
+                remainingSpace -= extraSpaceRequired;
+            }
+            else
+            {
+                break;
+            }
+
+            noDownsampleCutoff++;
+        }
+        return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
+    }
+
+    public CompactionInfo getCompactionInfo()
+    {
+        return new CompactionInfo(OperationType.INDEX_SUMMARY, (remainingSpace - memoryPoolBytes), memoryPoolBytes, "bytes");
+    }
+
+    /** Utility class for sorting sstables by their read rates. */
+    private static class ReadRateComparator implements Comparator<SSTableReader>
+    {
+        private final Map<SSTableReader, Double> readRates;
+
+        ReadRateComparator(Map<SSTableReader, Double> readRates)
+        {
+            this.readRates = readRates;
+        }
+
+        @Override
+        public int compare(SSTableReader o1, SSTableReader o2)
+        {
+            Double readRate1 = readRates.get(o1);
+            Double readRate2 = readRates.get(o2);
+            if (readRate1 == null && readRate2 == null)
+                return 0;
+            else if (readRate1 == null)
+                return -1;
+            else if (readRate2 == null)
+                return 1;
+            else
+                return Double.compare(readRate1, readRate2);
+        }
+    }
+
+    private static class ResampleEntry
+    {
+        public final SSTableReader sstable;
+        public final long newSpaceUsed;
+        public final int newSamplingLevel;
+
+        ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
+        {
+            this.sstable = sstable;
+            this.newSpaceUsed = newSpaceUsed;
+            this.newSamplingLevel = newSamplingLevel;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 64d3354..63928e2 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -22,12 +22,14 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,11 +38,12 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.metrics.RestorableMeter;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
 
 import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
 import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD;
@@ -75,6 +78,11 @@ public class IndexSummaryManagerTest extends SchemaLoader
     @After
     public void afterTest()
     {
+        for (CompactionInfo.Holder holder: CompactionMetrics.getCompactions())
+        {
+            holder.stop();
+        }
+
         String ksname = "Keyspace1";
         String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
         Keyspace keyspace = Keyspace.open(ksname);
@@ -499,4 +507,59 @@ public class IndexSummaryManagerTest extends SchemaLoader
                 assertTrue(entry.getValue() >= cfs.metadata.getMinIndexInterval());
         }
     }
+
+    @Test
+    public void testCancelIndex() throws Exception
+    {
+        String ksname = "Keyspace1";
+        String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        final int numSSTables = 4;
+        int numRows = 256;
+        createSSTables(ksname, cfname, numSSTables, numRows);
+
+        final List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
+        for (SSTableReader sstable : sstables)
+            sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
+
+        final long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize();
+
+        // everything should get cut in half
+        final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>();
+        Thread t = new Thread(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    redistributeSummaries(Collections.<SSTableReader>emptyList(), sstables, (singleSummaryOffHeapSpace * (numSSTables / 2)));
+                }
+                catch (CompactionInterruptedException ex)
+                {
+                    exception.set(ex);
+                }
+                catch (IOException ignored)
+                {
+                }
+            }
+        });
+        t.start();
+        while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive())
+            Thread.sleep(1);
+        CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+        t.join();
+
+        assertNotNull("Expected compaction interrupted exception", exception.get());
+        assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty());
+
+        Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables);
+        Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getSSTables());
+        Set<SSTableReader> disjoint = Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables);
+        assertTrue(String.format("Mismatched files before and after cancelling redistribution: %s",
+                                 Joiner.on(",").join(disjoint)),
+                   disjoint.isEmpty());
+
+        validateData(cfs, numRows);
+    }
 }


[2/4] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1667494
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1667494
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1667494

Branch: refs/heads/trunk
Commit: f16674949592b518ba9da837b70665df10832e9b
Parents: 7dd6b7d fc7075a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Dec 11 17:32:31 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Dec 11 17:32:31 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/compaction/CompactionInfo.java |  14 +-
 .../db/compaction/CompactionManager.java        |  14 +
 .../cassandra/db/compaction/OperationType.java  |   3 +-
 .../io/sstable/IndexSummaryManager.java         | 279 +--------------
 .../io/sstable/IndexSummaryRedistribution.java  | 349 +++++++++++++++++++
 .../io/sstable/IndexSummaryManagerTest.java     |  83 ++++-
 7 files changed, 459 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 30a76a9,2ee8b07..5da0d42
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,31 -1,10 +1,32 @@@
 -2.1.13
 - * Allow cancellation of index summary redistribution (CASSANDRA-8805)
 +2.2.5
 + * Fix regression in split size on CqlInputFormat (CASSANDRA-10835)
 + * Better handling of SSL connection errors inter-node (CASSANDRA-10816)
   * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
 + * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
 +Merged from 2.1:
++ * Allow cancellation of index summary redistribution (CASSANDRA-8805)
   * Fix Stress profile parsing on Windows (CASSANDRA-10808)
  
 -
 -2.1.12
 +2.2.4
 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
 + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
 + * Reject index queries while the index is building (CASSANDRA-8505)
 + * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
 + * Fix JSON update with prepared statements (CASSANDRA-10631)
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592)
 +Merged from 2.1:
   * Fix incremental repair hang when replica is down (CASSANDRA-10288)
   * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
   * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 65f93c0,9bddaf5..ba9c25e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1345,7 -1222,21 +1345,21 @@@ public class CompactionManager implemen
          return executor.submit(runnable);
      }
  
+     public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws IOException
+     {
+         metrics.beginCompaction(redistribution);
+ 
+         try
+         {
+             return redistribution.redistributeSummaries();
+         }
+         finally
+         {
+             metrics.finishCompaction(redistribution);
+         }
+     }
+ 
 -    static int getDefaultGcBefore(ColumnFamilyStore cfs)
 +    public static int getDefaultGcBefore(ColumnFamilyStore cfs)
      {
          // 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to
          // add any GcGrace however since 2ndary indexes are local to a node.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/OperationType.java
index a14f13f,475b591..6b66ded
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@@ -32,7 -32,7 +32,8 @@@ public enum OperationTyp
      TOMBSTONE_COMPACTION("Tombstone Compaction"),
      UNKNOWN("Unknown compaction type"),
      ANTICOMPACTION("Anticompaction after repair"),
-     VERIFY("Verify");
++    VERIFY("Verify"),
+     INDEX_SUMMARY("Index summary redistribution");
  
      private final String type;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 1dd3a4e,be5cc3c..4438dc1
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@@ -26,19 -30,19 +26,20 @@@ import javax.management.MBeanServer
  import javax.management.ObjectName;
  
  import com.google.common.annotations.VisibleForTesting;
- import com.google.common.collect.*;
- 
- import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import com.google.common.collect.HashMultimap;
 -import com.google.common.collect.Lists;
 -import com.google.common.collect.Multimap;
++import com.google.common.collect.ImmutableSet;
+ import com.google.common.collect.Sets;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.DataTracker;
 +import org.apache.cassandra.db.compaction.OperationType;
  import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.lifecycle.View;
+ import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.WrappedRunnable;
  
@@@ -57,13 -59,13 +56,6 @@@ public class IndexSummaryManager implem
      private int resizeIntervalInMinutes = 0;
      private long memoryPoolBytes;
  
--    // The target (or ideal) number of index summary entries must differ from the actual number of
--    // entries by this ratio in order to trigger an upsample or downsample of the summary.  Because
--    // upsampling requires reading the primary index in order to rebuild the summary, the threshold
--    // for upsampling is is higher.
--    static final double UPSAMPLE_THRESHOLD = 1.5;
--    static final double DOWNSAMPLE_THESHOLD = 0.75;
--
      private final DebuggableScheduledThreadPoolExecutor executor;
  
      // our next scheduled resizing run
@@@ -251,267 -249,8 +243,8 @@@
       * @return a list of new SSTableReader instances
       */
      @VisibleForTesting
 -    public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes) throws IOException
 +    public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes) throws IOException
      {
-         logger.info("Redistributing index summaries");
-         List<SSTableReader> oldFormatSSTables = new ArrayList<>();
-         List<SSTableReader> redistribute = new ArrayList<>();
-         for (LifecycleTransaction txn : transactions.values())
-         {
-             for (SSTableReader sstable : ImmutableList.copyOf(txn.originals()))
-             {
-                 // We can't change the sampling level of sstables with the old format, because the serialization format
-                 // doesn't include the sampling level.  Leave this one as it is.  (See CASSANDRA-8993 for details.)
-                 logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
-                 if (!sstable.descriptor.version.hasSamplingLevel())
-                 {
-                     oldFormatSSTables.add(sstable);
-                     txn.cancel(sstable);
-                 }
-             }
-             redistribute.addAll(txn.originals());
-         }
- 
-         long total = 0;
-         for (SSTableReader sstable : Iterables.concat(compacting, redistribute))
-             total += sstable.getIndexSummaryOffHeapSize();
- 
-         logger.trace("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
-                      redistribute.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
- 
-         final Map<SSTableReader, Double> readRates = new HashMap<>(redistribute.size());
-         double totalReadsPerSec = 0.0;
-         for (SSTableReader sstable : redistribute)
-         {
-             if (sstable.getReadMeter() != null)
-             {
-                 Double readRate = sstable.getReadMeter().fifteenMinuteRate();
-                 totalReadsPerSec += readRate;
-                 readRates.put(sstable, readRate);
-             }
-         }
-         logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
- 
-         // copy and sort by read rates (ascending)
-         List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute);
-         Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
- 
-         long remainingBytes = memoryPoolBytes;
-         for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
-             remainingBytes -= sstable.getIndexSummaryOffHeapSize();
- 
-         logger.trace("Index summaries for compacting SSTables are using {} MB of space",
-                      (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
-         List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, transactions, totalReadsPerSec, remainingBytes);
- 
-         for (LifecycleTransaction txn : transactions.values())
-             txn.finish();
- 
-         total = 0;
-         for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
-             total += sstable.getIndexSummaryOffHeapSize();
-         logger.trace("Completed resizing of index summaries; current approximate memory used: {} MB",
-                      total / 1024.0 / 1024.0);
- 
-         return newSSTables;
-     }
- 
-     private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables, Map<UUID, LifecycleTransaction> transactions,
-                                                             double totalReadsPerSec, long memoryPoolCapacity) throws IOException
-     {
- 
-         List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4);
-         List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
-         List<ResampleEntry> forceResample = new ArrayList<>();
-         List<ResampleEntry> forceUpsample = new ArrayList<>();
-         List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
- 
-         // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional
-         // to the number of total reads/sec it handles.
-         long remainingSpace = memoryPoolCapacity;
-         for (SSTableReader sstable : sstables)
-         {
-             int minIndexInterval = sstable.metadata.getMinIndexInterval();
-             int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
- 
-             double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
-             long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
- 
-             // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that
-             int currentNumEntries = sstable.getIndexSummarySize();
-             double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
-             long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
-             int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
-             int maxSummarySize = sstable.getMaxIndexSummarySize();
- 
-             // if the min_index_interval changed, calculate what our current sampling level would be under the new min
-             if (sstable.getMinIndexInterval() != minIndexInterval)
-             {
-                 int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval()));
-                 maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval));
-                 logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})",
-                              sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel);
-                 currentSamplingLevel = effectiveSamplingLevel;
-             }
- 
-             int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries,
-                     minIndexInterval, maxIndexInterval);
-             int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize);
-             double effectiveIndexInterval = sstable.getEffectiveIndexInterval();
- 
-             logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " +
-                     "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)",
-                     sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries,
-                     currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
-                     numEntriesAtNewSamplingLevel * avgEntrySize);
- 
-             if (effectiveIndexInterval < minIndexInterval)
-             {
-                 // The min_index_interval was changed; re-sample to match it.
-                 logger.trace("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})",
-                         sstable, effectiveIndexInterval, minIndexInterval);
-                 long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
-                 forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
-                 remainingSpace -= spaceUsed;
-             }
-             else if (effectiveIndexInterval > maxIndexInterval)
-             {
-                 // The max_index_interval was lowered; force an upsample to the effective minimum sampling level
-                 logger.trace("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})",
-                         sstable, effectiveIndexInterval, maxIndexInterval);
-                 newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval);
-                 numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize());
-                 long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
-                 forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
-                 remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
-             }
-             else if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
-             {
-                 long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
-                 toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
-                 remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
-             }
-             else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
-             {
-                 long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
-                 toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
-                 remainingSpace -= spaceUsed;
-             }
-             else
-             {
-                 // keep the same sampling level
-                 logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
-                 remainingSpace -= sstable.getIndexSummaryOffHeapSize();
-                 newSSTables.add(sstable);
-                 transactions.get(sstable.metadata.cfId).cancel(sstable);
-             }
-             totalReadsPerSec -= readsPerSec;
-         }
- 
-         if (remainingSpace > 0)
-         {
-             Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace);
-             toDownsample = result.right;
-             newSSTables.addAll(result.left);
-             for (SSTableReader sstable : result.left)
-                 transactions.get(sstable.metadata.cfId).cancel(sstable);
-         }
- 
-         // downsample first, then upsample
-         toDownsample.addAll(forceResample);
-         toDownsample.addAll(toUpsample);
-         toDownsample.addAll(forceUpsample);
-         for (ResampleEntry entry : toDownsample)
-         {
-             SSTableReader sstable = entry.sstable;
-             logger.trace("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
-                          sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
-                          entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
-             ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
-             SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
-             newSSTables.add(replacement);
-             transactions.get(sstable.metadata.cfId).update(replacement, true);
-         }
- 
-         return newSSTables;
-     }
- 
-     @VisibleForTesting
-     static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
-     {
-         // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations
-         // that will make little difference.
-         Collections.sort(toDownsample, new Comparator<ResampleEntry>()
-         {
-             public int compare(ResampleEntry o1, ResampleEntry o2)
-             {
-                 return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
-                                       o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
-             }
-         });
- 
-         int noDownsampleCutoff = 0;
-         List<SSTableReader> willNotDownsample = new ArrayList<>();
-         while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
-         {
-             ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
- 
-             long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
-             // see if we have enough leftover space to keep the current sampling level
-             if (extraSpaceRequired <= remainingSpace)
-             {
-                 logger.trace("Using leftover space to keep {} at the current sampling level ({})",
-                              entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
-                 willNotDownsample.add(entry.sstable);
-                 remainingSpace -= extraSpaceRequired;
-             }
-             else
-             {
-                 break;
-             }
- 
-             noDownsampleCutoff++;
-         }
-         return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
-     }
- 
-     private static class ResampleEntry
-     {
-         public final SSTableReader sstable;
-         public final long newSpaceUsed;
-         public final int newSamplingLevel;
- 
-         public ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
-         {
-             this.sstable = sstable;
-             this.newSpaceUsed = newSpaceUsed;
-             this.newSamplingLevel = newSamplingLevel;
-         }
-     }
- 
-     /** Utility class for sorting sstables by their read rates. */
-     private static class ReadRateComparator implements Comparator<SSTableReader>
-     {
-         private final Map<SSTableReader, Double> readRates;
- 
-         public ReadRateComparator(Map<SSTableReader, Double> readRates)
-         {
-             this.readRates = readRates;
-         }
- 
-         @Override
-         public int compare(SSTableReader o1, SSTableReader o2)
-         {
-             Double readRate1 = readRates.get(o1);
-             Double readRate2 = readRates.get(o2);
-             if (readRate1 == null && readRate2 == null)
-                 return 0;
-             else if (readRate1 == null)
-                 return -1;
-             else if (readRate2 == null)
-                 return 1;
-             else
-                 return Double.compare(readRate1, readRate2);
-         }
 -        return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, nonCompacting, memoryPoolBytes));
++        return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, transactions, memoryPoolBytes));
      }
- }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 0000000,adb3e4e..aad479b
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@@ -1,0 -1,338 +1,349 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.io.sstable;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
++import java.util.UUID;
+ 
+ import com.google.common.annotations.VisibleForTesting;
 -import com.google.common.collect.HashMultimap;
++import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.Iterables;
 -import com.google.common.collect.Multimap;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.DataTracker;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.compaction.CompactionInfo;
+ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+ import org.apache.cassandra.db.compaction.OperationType;
++import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.utils.Pair;
+ 
+ import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+ 
+ public class IndexSummaryRedistribution extends CompactionInfo.Holder
+ {
+     private static final Logger logger = LoggerFactory.getLogger(IndexSummaryRedistribution.class);
+ 
++    // The target (or ideal) number of index summary entries must differ from the actual number of
++    // entries by this ratio in order to trigger an upsample or downsample of the summary.  Because
++    // upsampling requires reading the primary index in order to rebuild the summary, the threshold
++    // for upsampling is is higher.
++    static final double UPSAMPLE_THRESHOLD = 1.5;
++    static final double DOWNSAMPLE_THESHOLD = 0.75;
++
+     private final List<SSTableReader> compacting;
 -    private final List<SSTableReader> nonCompacting;
++    private final Map<UUID, LifecycleTransaction> transactions;
+     private final long memoryPoolBytes;
++    private final UUID compactionId;
+     private volatile long remainingSpace;
+ 
 -    public IndexSummaryRedistribution(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes)
++    public IndexSummaryRedistribution(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes)
+     {
+         this.compacting = compacting;
 -        this.nonCompacting = nonCompacting;
++        this.transactions = transactions;
+         this.memoryPoolBytes = memoryPoolBytes;
++        this.compactionId = UUID.randomUUID();
+     }
+ 
+     public List<SSTableReader> redistributeSummaries() throws IOException
+     {
 -        long total = 0;
 -        for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting))
 -            total += sstable.getIndexSummaryOffHeapSize();
 -
++        logger.info("Redistributing index summaries");
+         List<SSTableReader> oldFormatSSTables = new ArrayList<>();
 -        for (SSTableReader sstable : nonCompacting)
++        List<SSTableReader> redistribute = new ArrayList<>();
++        for (LifecycleTransaction txn : transactions.values())
+         {
 -            // We can't change the sampling level of sstables with the old format, because the serialization format
 -            // doesn't include the sampling level.  Leave this one as it is.  (See CASSANDRA-8993 for details.)
 -            logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
 -            if (!sstable.descriptor.version.hasSamplingLevel)
 -                oldFormatSSTables.add(sstable);
++            for (SSTableReader sstable : ImmutableList.copyOf(txn.originals()))
++            {
++                // We can't change the sampling level of sstables with the old format, because the serialization format
++                // doesn't include the sampling level.  Leave this one as it is.  (See CASSANDRA-8993 for details.)
++                logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
++                if (!sstable.descriptor.version.hasSamplingLevel())
++                {
++                    oldFormatSSTables.add(sstable);
++                    txn.cancel(sstable);
++                }
++            }
++            redistribute.addAll(txn.originals());
+         }
 -        nonCompacting.removeAll(oldFormatSSTables);
+ 
 -        logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
 -                     nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
++        long total = 0;
++        for (SSTableReader sstable : Iterables.concat(compacting, redistribute))
++            total += sstable.getIndexSummaryOffHeapSize();
+ 
 -        final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size());
++        logger.trace("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
++                     redistribute.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
++
++        final Map<SSTableReader, Double> readRates = new HashMap<>(redistribute.size());
+         double totalReadsPerSec = 0.0;
 -        for (SSTableReader sstable : nonCompacting)
++        for (SSTableReader sstable : redistribute)
+         {
+             if (isStopRequested())
+                 throw new CompactionInterruptedException(getCompactionInfo());
+ 
+             if (sstable.getReadMeter() != null)
+             {
+                 Double readRate = sstable.getReadMeter().fifteenMinuteRate();
+                 totalReadsPerSec += readRate;
+                 readRates.put(sstable, readRate);
+             }
+         }
+         logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
+ 
+         // copy and sort by read rates (ascending)
 -        List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting);
++        List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute);
+         Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
+ 
+         long remainingBytes = memoryPoolBytes;
+         for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
+             remainingBytes -= sstable.getIndexSummaryOffHeapSize();
+ 
+         logger.trace("Index summaries for compacting SSTables are using {} MB of space",
+                      (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
 -        List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes);
++        List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, transactions, totalReadsPerSec, remainingBytes);
++
++        for (LifecycleTransaction txn : transactions.values())
++            txn.finish();
+ 
+         total = 0;
+         for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
+             total += sstable.getIndexSummaryOffHeapSize();
 -        logger.debug("Completed resizing of index summaries; current approximate memory used: {} MB",
++        logger.trace("Completed resizing of index summaries; current approximate memory used: {} MB",
+                      total / 1024.0 / 1024.0);
+ 
+         return newSSTables;
+     }
+ 
+     private List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables,
++                                                     Map<UUID, LifecycleTransaction> transactions,
+                                                      double totalReadsPerSec, long memoryPoolCapacity) throws IOException
+     {
 -
+         List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4);
+         List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
+         List<ResampleEntry> forceResample = new ArrayList<>();
+         List<ResampleEntry> forceUpsample = new ArrayList<>();
+         List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
+ 
+         // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional
+         // to the number of total reads/sec it handles.
+         remainingSpace = memoryPoolCapacity;
+         for (SSTableReader sstable : sstables)
+         {
+             if (isStopRequested())
+                 throw new CompactionInterruptedException(getCompactionInfo());
+ 
+             int minIndexInterval = sstable.metadata.getMinIndexInterval();
+             int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
+ 
+             double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
+             long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
+ 
+             // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that
+             int currentNumEntries = sstable.getIndexSummarySize();
+             double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
+             long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
+             int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
+             int maxSummarySize = sstable.getMaxIndexSummarySize();
+ 
+             // if the min_index_interval changed, calculate what our current sampling level would be under the new min
+             if (sstable.getMinIndexInterval() != minIndexInterval)
+             {
+                 int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval()));
+                 maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval));
+                 logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})",
+                              sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel);
+                 currentSamplingLevel = effectiveSamplingLevel;
+             }
+ 
+             int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries,
 -                                                                              minIndexInterval, maxIndexInterval);
++                    minIndexInterval, maxIndexInterval);
+             int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize);
+             double effectiveIndexInterval = sstable.getEffectiveIndexInterval();
+ 
+             logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " +
 -                         "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)",
 -                         sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries,
 -                         currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
 -                         numEntriesAtNewSamplingLevel * avgEntrySize);
++                    "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)",
++                    sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries,
++                    currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
++                    numEntriesAtNewSamplingLevel * avgEntrySize);
+ 
+             if (effectiveIndexInterval < minIndexInterval)
+             {
+                 // The min_index_interval was changed; re-sample to match it.
 -                logger.debug("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})",
 -                             sstable, effectiveIndexInterval, minIndexInterval);
++                logger.trace("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})",
++                        sstable, effectiveIndexInterval, minIndexInterval);
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                 forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                 remainingSpace -= spaceUsed;
+             }
+             else if (effectiveIndexInterval > maxIndexInterval)
+             {
+                 // The max_index_interval was lowered; force an upsample to the effective minimum sampling level
 -                logger.debug("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})",
 -                             sstable, effectiveIndexInterval, maxIndexInterval);
++                logger.trace("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})",
++                        sstable, effectiveIndexInterval, maxIndexInterval);
+                 newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval);
+                 numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize());
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                 forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                 remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+             }
 -            else if (targetNumEntries >= currentNumEntries * IndexSummaryManager.UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
++            else if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
+             {
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                 toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                 remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+             }
 -            else if (targetNumEntries < currentNumEntries * IndexSummaryManager.DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
++            else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
+             {
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                 toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                 remainingSpace -= spaceUsed;
+             }
+             else
+             {
+                 // keep the same sampling level
+                 logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
+                 remainingSpace -= sstable.getIndexSummaryOffHeapSize();
+                 newSSTables.add(sstable);
++                transactions.get(sstable.metadata.cfId).cancel(sstable);
+             }
+             totalReadsPerSec -= readsPerSec;
+         }
+ 
+         if (remainingSpace > 0)
+         {
+             Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace);
+             toDownsample = result.right;
+             newSSTables.addAll(result.left);
++            for (SSTableReader sstable : result.left)
++                transactions.get(sstable.metadata.cfId).cancel(sstable);
+         }
+ 
+         // downsample first, then upsample
+         toDownsample.addAll(forceResample);
+         toDownsample.addAll(toUpsample);
+         toDownsample.addAll(forceUpsample);
 -        Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create();
 -        Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create();
 -
 -        try
++        for (ResampleEntry entry : toDownsample)
+         {
 -            for (ResampleEntry entry : toDownsample)
 -            {
 -                if (isStopRequested())
 -                    throw new CompactionInterruptedException(getCompactionInfo());
 -
 -                SSTableReader sstable = entry.sstable;
 -                logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
 -                             sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
 -                             entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
 -                ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName());
 -                DataTracker tracker = cfs.getDataTracker();
 -                SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
 -                newSSTables.add(replacement);
 -                replacedByTracker.put(tracker, sstable);
 -                replacementsByTracker.put(tracker, replacement);
 -            }
 -        }
 -        finally
 -        {
 -            for (DataTracker tracker : replacedByTracker.keySet())
 -                tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
++            if (isStopRequested())
++                throw new CompactionInterruptedException(getCompactionInfo());
++
++            SSTableReader sstable = entry.sstable;
++            logger.trace("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
++                         sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
++                         entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
++            ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
++            SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
++            newSSTables.add(replacement);
++            transactions.get(sstable.metadata.cfId).update(replacement, true);
+         }
+ 
+         return newSSTables;
+     }
+ 
+     @VisibleForTesting
+     static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
+     {
+         // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations
+         // that will make little difference.
+         Collections.sort(toDownsample, new Comparator<ResampleEntry>()
+         {
+             public int compare(ResampleEntry o1, ResampleEntry o2)
+             {
+                 return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
+                                       o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
+             }
+         });
+ 
+         int noDownsampleCutoff = 0;
+         List<SSTableReader> willNotDownsample = new ArrayList<>();
+         while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
+         {
+             ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
+ 
+             long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
+             // see if we have enough leftover space to keep the current sampling level
+             if (extraSpaceRequired <= remainingSpace)
+             {
+                 logger.trace("Using leftover space to keep {} at the current sampling level ({})",
+                              entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
+                 willNotDownsample.add(entry.sstable);
+                 remainingSpace -= extraSpaceRequired;
+             }
+             else
+             {
+                 break;
+             }
+ 
+             noDownsampleCutoff++;
+         }
+         return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
+     }
+ 
+     public CompactionInfo getCompactionInfo()
+     {
 -        return new CompactionInfo(OperationType.INDEX_SUMMARY, (remainingSpace - memoryPoolBytes), memoryPoolBytes, "bytes");
++        return new CompactionInfo(OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, "bytes", compactionId);
+     }
+ 
+     /** Utility class for sorting sstables by their read rates. */
+     private static class ReadRateComparator implements Comparator<SSTableReader>
+     {
+         private final Map<SSTableReader, Double> readRates;
+ 
+         ReadRateComparator(Map<SSTableReader, Double> readRates)
+         {
+             this.readRates = readRates;
+         }
+ 
+         @Override
+         public int compare(SSTableReader o1, SSTableReader o2)
+         {
+             Double readRate1 = readRates.get(o1);
+             Double readRate2 = readRates.get(o2);
+             if (readRate1 == null && readRate2 == null)
+                 return 0;
+             else if (readRate1 == null)
+                 return -1;
+             else if (readRate2 == null)
+                 return 1;
+             else
+                 return Double.compare(readRate1, readRate2);
+         }
+     }
+ 
+     private static class ResampleEntry
+     {
+         public final SSTableReader sstable;
+         public final long newSpaceUsed;
+         public final int newSamplingLevel;
+ 
+         ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
+         {
+             this.sstable = sstable;
+             this.newSpaceUsed = newSpaceUsed;
+             this.newSamplingLevel = newSamplingLevel;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 5e46b8e,63928e2..6935680
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@@ -21,38 -21,33 +21,41 @@@ import java.io.IOException
  import java.nio.ByteBuffer;
  import java.util.*;
  import java.util.concurrent.*;
--import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicReference;
  
- import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import com.google.common.base.Joiner;
+ import com.google.common.collect.Sets;
  import org.junit.After;
  import org.junit.Before;
 +import org.junit.BeforeClass;
  import org.junit.Test;
  import org.junit.runner.RunWith;
- 
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
--import junit.framework.Assert;
  import org.apache.cassandra.OrderedJUnit4ClassRunner;
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 +import org.apache.cassandra.cache.CachingOptions;
 +import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.compaction.CompactionInfo;
+ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
  import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.compaction.OperationType;
  import org.apache.cassandra.db.filter.QueryFilter;
++import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.locator.SimpleStrategy;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.metrics.CompactionMetrics;
  import org.apache.cassandra.metrics.RestorableMeter;
  
 +import static com.google.common.collect.ImmutableMap.of;
 +import static java.util.Arrays.asList;
  import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
--import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD;
--import static org.apache.cassandra.io.sstable.IndexSummaryManager.UPSAMPLE_THRESHOLD;
++import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.DOWNSAMPLE_THESHOLD;
++import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.UPSAMPLE_THRESHOLD;
  import static org.apache.cassandra.io.sstable.IndexSummaryManager.redistributeSummaries;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertNotNull;
@@@ -106,8 -78,13 +109,13 @@@ public class IndexSummaryManagerTes
      @After
      public void afterTest()
      {
+         for (CompactionInfo.Holder holder: CompactionMetrics.getCompactions())
+         {
+             holder.stop();
+         }
+ 
 -        String ksname = "Keyspace1";
 -        String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
 +        String ksname = KEYSPACE1;
 +        String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no key caching
          Keyspace keyspace = Keyspace.open(ksname);
          ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
          cfs.metadata.minIndexInterval(originalMinIndexInterval);
@@@ -581,4 -507,59 +589,65 @@@
                  assertTrue(entry.getValue() >= cfs.metadata.getMinIndexInterval());
          }
      }
+ 
+     @Test
+     public void testCancelIndex() throws Exception
+     {
 -        String ksname = "Keyspace1";
 -        String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
++        String ksname = KEYSPACE1;
++        String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no key caching
+         Keyspace keyspace = Keyspace.open(ksname);
 -        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
++        final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+         final int numSSTables = 4;
++        final int numTries = 4;
+         int numRows = 256;
+         createSSTables(ksname, cfname, numSSTables, numRows);
+ 
+         final List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
+         for (SSTableReader sstable : sstables)
+             sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
+ 
+         final long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize();
+ 
+         // everything should get cut in half
+         final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>();
++
+         Thread t = new Thread(new Runnable()
+         {
+             public void run()
+             {
+                 try
+                 {
 -                    redistributeSummaries(Collections.<SSTableReader>emptyList(), sstables, (singleSummaryOffHeapSpace * (numSSTables / 2)));
++                    // Don't leave enough space for even the minimal index summaries
++                    try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
++                    {
++                        redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), singleSummaryOffHeapSpace);
++                    }
+                 }
+                 catch (CompactionInterruptedException ex)
+                 {
+                     exception.set(ex);
+                 }
+                 catch (IOException ignored)
+                 {
+                 }
+             }
+         });
+         t.start();
+         while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive())
 -            Thread.sleep(1);
++            Thread.yield();
+         CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+         t.join();
+ 
+         assertNotNull("Expected compaction interrupted exception", exception.get());
+         assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty());
+ 
+         Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables);
+         Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getSSTables());
+         Set<SSTableReader> disjoint = Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables);
+         assertTrue(String.format("Mismatched files before and after cancelling redistribution: %s",
+                                  Joiner.on(",").join(disjoint)),
+                    disjoint.isEmpty());
+ 
+         validateData(cfs, numRows);
+     }
  }


[3/4] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65885e7f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65885e7f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65885e7f

Branch: refs/heads/trunk
Commit: 65885e7fc356c342331aec11667b5abdc28897b6
Parents: b55523e f166749
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Dec 11 17:34:21 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Dec 11 17:37:41 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../cassandra/db/compaction/CompactionInfo.java |  14 +-
 .../db/compaction/CompactionManager.java        |  15 +
 .../cassandra/db/compaction/OperationType.java  |   3 +-
 .../io/sstable/IndexSummaryManager.java         | 279 +--------------
 .../io/sstable/IndexSummaryRedistribution.java  | 349 +++++++++++++++++++
 .../io/sstable/IndexSummaryManagerTest.java     |  80 ++++-
 7 files changed, 462 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9c01160,5da0d42..5932dbb
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,34 -1,13 +1,37 @@@
 -2.2.5
 - * Fix regression in split size on CqlInputFormat (CASSANDRA-10835)
 - * Better handling of SSL connection errors inter-node (CASSANDRA-10816)
 - * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
 - * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
++3.0.2
+ Merged from 2.1:
+  * Allow cancellation of index summary redistribution (CASSANDRA-8805)
 - * Fix Stress profile parsing on Windows (CASSANDRA-10808)
 -
 -2.2.4
 +3.0.1
 + * Avoid MV race during node decommission (CASSANDRA-10674)
 + * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
 + * Handle single-column deletions correction in materialized views
 +   when the column is part of the view primary key (CASSANDRA-10796)
 + * Fix issue with datadir migration on upgrade (CASSANDRA-10788)
 + * Fix bug with range tombstones on reverse queries and test coverage for
 +   AbstractBTreePartition (CASSANDRA-10059)
 + * Remove 64k limit on collection elements (CASSANDRA-10374)
 + * Remove unclear Indexer.indexes() method (CASSANDRA-10690)
 + * Fix NPE on stream read error (CASSANDRA-10771)
 + * Normalize cqlsh DESC output (CASSANDRA-10431)
 + * Rejects partition range deletions when columns are specified (CASSANDRA-10739)
 + * Fix error when saving cached key for old format sstable (CASSANDRA-10778)
 + * Invalidate prepared statements on DROP INDEX (CASSANDRA-10758)
 + * Fix SELECT statement with IN restrictions on partition key,
 +   ORDER BY and LIMIT (CASSANDRA-10729)
 + * Improve stress performance over 1k threads (CASSANDRA-7217)
 + * Wait for migration responses to complete before bootstrapping (CASSANDRA-10731)
 + * Unable to create a function with argument of type Inet (CASSANDRA-10741)
 + * Fix backward incompatibiliy in CqlInputFormat (CASSANDRA-10717)
 + * Correctly preserve deletion info on updated rows when notifying indexers
 +   of single-row deletions (CASSANDRA-10694)
 + * Notify indexers of partition delete during cleanup (CASSANDRA-10685)
 + * Keep the file open in trySkipCache (CASSANDRA-10669)
 + * Updated trigger example (CASSANDRA-10257)
 +Merged from 2.2:
 + * Fix regression on split size in CqlInputFormat (CASSANDRA-10835)
 + * Better handling of SSL connection errors inter-node (CASSANDRA-10816)
 + * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
 + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592)
   * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
   * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
   * Reject index queries while the index is building (CASSANDRA-8505)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3ce7d2c,ba9c25e..bd950e3
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -50,11 -47,7 +50,12 @@@ import org.apache.cassandra.db.view.Vie
  import org.apache.cassandra.dht.Bounds;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.index.SecondaryIndexBuilder;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
++import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
 +import org.apache.cassandra.io.sstable.SSTableRewriter;
 +import org.apache.cassandra.io.sstable.SnapshotDeletingTask;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@@ -1322,7 -1345,21 +1323,21 @@@ public class CompactionManager implemen
          return executor.submit(runnable);
      }
  
+     public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws IOException
+     {
+         metrics.beginCompaction(redistribution);
+ 
+         try
+         {
+             return redistribution.redistributeSummaries();
+         }
+         finally
+         {
+             metrics.finishCompaction(redistribution);
+         }
+     }
+ 
 -    public static int getDefaultGcBefore(ColumnFamilyStore cfs)
 +    public static int getDefaultGcBefore(ColumnFamilyStore cfs, int nowInSec)
      {
          // 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to
          // add any GcGrace however since 2ndary indexes are local to a node.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/OperationType.java
index a69622b,6b66ded..20e6df2
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@@ -33,13 -33,9 +33,14 @@@ public enum OperationTyp
      UNKNOWN("Unknown compaction type"),
      ANTICOMPACTION("Anticompaction after repair"),
      VERIFY("Verify"),
 +    FLUSH("Flush"),
 +    STREAM("Stream"),
 +    WRITE("Write"),
-     VIEW_BUILD("View build");
++    VIEW_BUILD("View build"),
+     INDEX_SUMMARY("Index summary redistribution");
  
 -    private final String type;
 +    public final String type;
 +    public final String fileName;
  
      OperationType(String type)
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index e07f297,4438dc1..aed35c9
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@@ -26,10 -26,8 +26,7 @@@ import javax.management.MBeanServer
  import javax.management.ObjectName;
  
  import com.google.common.annotations.VisibleForTesting;
 -import com.google.common.collect.ImmutableSet;
 -import com.google.common.collect.Sets;
 +import com.google.common.collect.*;
- 
- import org.apache.cassandra.db.lifecycle.SSTableSet;
- import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -40,6 -38,8 +37,9 @@@ import org.apache.cassandra.db.compacti
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.db.lifecycle.View;
++import org.apache.cassandra.db.lifecycle.SSTableSet;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.WrappedRunnable;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 0000000,aad479b..b4eae31
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@@ -1,0 -1,349 +1,349 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.io.sstable;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.UUID;
+ 
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.Iterables;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.compaction.CompactionInfo;
+ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+ import org.apache.cassandra.db.compaction.OperationType;
+ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.utils.Pair;
+ 
+ import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+ 
+ public class IndexSummaryRedistribution extends CompactionInfo.Holder
+ {
+     private static final Logger logger = LoggerFactory.getLogger(IndexSummaryRedistribution.class);
+ 
+     // The target (or ideal) number of index summary entries must differ from the actual number of
+     // entries by this ratio in order to trigger an upsample or downsample of the summary.  Because
+     // upsampling requires reading the primary index in order to rebuild the summary, the threshold
+     // for upsampling is is higher.
+     static final double UPSAMPLE_THRESHOLD = 1.5;
+     static final double DOWNSAMPLE_THESHOLD = 0.75;
+ 
+     private final List<SSTableReader> compacting;
+     private final Map<UUID, LifecycleTransaction> transactions;
+     private final long memoryPoolBytes;
+     private final UUID compactionId;
+     private volatile long remainingSpace;
+ 
+     public IndexSummaryRedistribution(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes)
+     {
+         this.compacting = compacting;
+         this.transactions = transactions;
+         this.memoryPoolBytes = memoryPoolBytes;
+         this.compactionId = UUID.randomUUID();
+     }
+ 
+     public List<SSTableReader> redistributeSummaries() throws IOException
+     {
+         logger.info("Redistributing index summaries");
+         List<SSTableReader> oldFormatSSTables = new ArrayList<>();
+         List<SSTableReader> redistribute = new ArrayList<>();
+         for (LifecycleTransaction txn : transactions.values())
+         {
+             for (SSTableReader sstable : ImmutableList.copyOf(txn.originals()))
+             {
+                 // We can't change the sampling level of sstables with the old format, because the serialization format
+                 // doesn't include the sampling level.  Leave this one as it is.  (See CASSANDRA-8993 for details.)
+                 logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
+                 if (!sstable.descriptor.version.hasSamplingLevel())
+                 {
+                     oldFormatSSTables.add(sstable);
+                     txn.cancel(sstable);
+                 }
+             }
+             redistribute.addAll(txn.originals());
+         }
+ 
+         long total = 0;
+         for (SSTableReader sstable : Iterables.concat(compacting, redistribute))
+             total += sstable.getIndexSummaryOffHeapSize();
+ 
+         logger.trace("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
+                      redistribute.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
+ 
+         final Map<SSTableReader, Double> readRates = new HashMap<>(redistribute.size());
+         double totalReadsPerSec = 0.0;
+         for (SSTableReader sstable : redistribute)
+         {
+             if (isStopRequested())
+                 throw new CompactionInterruptedException(getCompactionInfo());
+ 
+             if (sstable.getReadMeter() != null)
+             {
+                 Double readRate = sstable.getReadMeter().fifteenMinuteRate();
+                 totalReadsPerSec += readRate;
+                 readRates.put(sstable, readRate);
+             }
+         }
+         logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
+ 
+         // copy and sort by read rates (ascending)
+         List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute);
+         Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
+ 
+         long remainingBytes = memoryPoolBytes;
+         for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
+             remainingBytes -= sstable.getIndexSummaryOffHeapSize();
+ 
+         logger.trace("Index summaries for compacting SSTables are using {} MB of space",
+                      (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
+         List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, transactions, totalReadsPerSec, remainingBytes);
+ 
+         for (LifecycleTransaction txn : transactions.values())
+             txn.finish();
+ 
+         total = 0;
+         for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
+             total += sstable.getIndexSummaryOffHeapSize();
+         logger.trace("Completed resizing of index summaries; current approximate memory used: {} MB",
+                      total / 1024.0 / 1024.0);
+ 
+         return newSSTables;
+     }
+ 
+     private List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables,
+                                                      Map<UUID, LifecycleTransaction> transactions,
+                                                      double totalReadsPerSec, long memoryPoolCapacity) throws IOException
+     {
+         List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4);
+         List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
+         List<ResampleEntry> forceResample = new ArrayList<>();
+         List<ResampleEntry> forceUpsample = new ArrayList<>();
+         List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
+ 
+         // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional
+         // to the number of total reads/sec it handles.
+         remainingSpace = memoryPoolCapacity;
+         for (SSTableReader sstable : sstables)
+         {
+             if (isStopRequested())
+                 throw new CompactionInterruptedException(getCompactionInfo());
+ 
 -            int minIndexInterval = sstable.metadata.getMinIndexInterval();
 -            int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
++            int minIndexInterval = sstable.metadata.params.minIndexInterval;
++            int maxIndexInterval = sstable.metadata.params.maxIndexInterval;
+ 
+             double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
+             long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
+ 
+             // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that
+             int currentNumEntries = sstable.getIndexSummarySize();
+             double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
+             long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
+             int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
+             int maxSummarySize = sstable.getMaxIndexSummarySize();
+ 
+             // if the min_index_interval changed, calculate what our current sampling level would be under the new min
+             if (sstable.getMinIndexInterval() != minIndexInterval)
+             {
+                 int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval()));
+                 maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval));
+                 logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})",
+                              sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel);
+                 currentSamplingLevel = effectiveSamplingLevel;
+             }
+ 
+             int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries,
+                     minIndexInterval, maxIndexInterval);
+             int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize);
+             double effectiveIndexInterval = sstable.getEffectiveIndexInterval();
+ 
+             logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " +
+                     "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)",
+                     sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries,
+                     currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
+                     numEntriesAtNewSamplingLevel * avgEntrySize);
+ 
+             if (effectiveIndexInterval < minIndexInterval)
+             {
+                 // The min_index_interval was changed; re-sample to match it.
+                 logger.trace("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})",
+                         sstable, effectiveIndexInterval, minIndexInterval);
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                 forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                 remainingSpace -= spaceUsed;
+             }
+             else if (effectiveIndexInterval > maxIndexInterval)
+             {
+                 // The max_index_interval was lowered; force an upsample to the effective minimum sampling level
+                 logger.trace("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})",
+                         sstable, effectiveIndexInterval, maxIndexInterval);
+                 newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval);
+                 numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize());
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                 forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                 remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+             }
+             else if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
+             {
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                 toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                 remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+             }
+             else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
+             {
+                 long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+                 toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+                 remainingSpace -= spaceUsed;
+             }
+             else
+             {
+                 // keep the same sampling level
+                 logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
+                 remainingSpace -= sstable.getIndexSummaryOffHeapSize();
+                 newSSTables.add(sstable);
+                 transactions.get(sstable.metadata.cfId).cancel(sstable);
+             }
+             totalReadsPerSec -= readsPerSec;
+         }
+ 
+         if (remainingSpace > 0)
+         {
+             Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace);
+             toDownsample = result.right;
+             newSSTables.addAll(result.left);
+             for (SSTableReader sstable : result.left)
+                 transactions.get(sstable.metadata.cfId).cancel(sstable);
+         }
+ 
+         // downsample first, then upsample
+         toDownsample.addAll(forceResample);
+         toDownsample.addAll(toUpsample);
+         toDownsample.addAll(forceUpsample);
+         for (ResampleEntry entry : toDownsample)
+         {
+             if (isStopRequested())
+                 throw new CompactionInterruptedException(getCompactionInfo());
+ 
+             SSTableReader sstable = entry.sstable;
+             logger.trace("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
+                          sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
+                          entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
+             ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
+             SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
+             newSSTables.add(replacement);
+             transactions.get(sstable.metadata.cfId).update(replacement, true);
+         }
+ 
+         return newSSTables;
+     }
+ 
+     @VisibleForTesting
+     static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
+     {
+         // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations
+         // that will make little difference.
+         Collections.sort(toDownsample, new Comparator<ResampleEntry>()
+         {
+             public int compare(ResampleEntry o1, ResampleEntry o2)
+             {
+                 return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
+                                       o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
+             }
+         });
+ 
+         int noDownsampleCutoff = 0;
+         List<SSTableReader> willNotDownsample = new ArrayList<>();
+         while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
+         {
+             ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
+ 
+             long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
+             // see if we have enough leftover space to keep the current sampling level
+             if (extraSpaceRequired <= remainingSpace)
+             {
+                 logger.trace("Using leftover space to keep {} at the current sampling level ({})",
+                              entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
+                 willNotDownsample.add(entry.sstable);
+                 remainingSpace -= extraSpaceRequired;
+             }
+             else
+             {
+                 break;
+             }
+ 
+             noDownsampleCutoff++;
+         }
+         return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
+     }
+ 
+     public CompactionInfo getCompactionInfo()
+     {
+         return new CompactionInfo(OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, "bytes", compactionId);
+     }
+ 
+     /** Utility class for sorting sstables by their read rates. */
+     private static class ReadRateComparator implements Comparator<SSTableReader>
+     {
+         private final Map<SSTableReader, Double> readRates;
+ 
+         ReadRateComparator(Map<SSTableReader, Double> readRates)
+         {
+             this.readRates = readRates;
+         }
+ 
+         @Override
+         public int compare(SSTableReader o1, SSTableReader o2)
+         {
+             Double readRate1 = readRates.get(o1);
+             Double readRate2 = readRates.get(o2);
+             if (readRate1 == null && readRate2 == null)
+                 return 0;
+             else if (readRate1 == null)
+                 return -1;
+             else if (readRate2 == null)
+                 return 1;
+             else
+                 return Double.compare(readRate1, readRate2);
+         }
+     }
+ 
+     private static class ResampleEntry
+     {
+         public final SSTableReader sstable;
+         public final long newSpaceUsed;
+         public final int newSamplingLevel;
+ 
+         ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
+         {
+             this.sstable = sstable;
+             this.newSpaceUsed = newSpaceUsed;
+             this.newSamplingLevel = newSamplingLevel;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 5493edb,6935680..0498c68
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@@ -19,15 -19,12 +19,20 @@@ package org.apache.cassandra.io.sstable
  
  import java.io.IOException;
  import java.nio.ByteBuffer;
 -import java.util.*;
 -import java.util.concurrent.*;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.Comparator;
++import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
++import java.util.Set;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicReference;
  
+ import com.google.common.base.Joiner;
+ import com.google.common.collect.Sets;
  import org.junit.After;
  import org.junit.Before;
  import org.junit.BeforeClass;
@@@ -39,18 -36,20 +44,22 @@@ import org.slf4j.LoggerFactory
  import org.apache.cassandra.OrderedJUnit4ClassRunner;
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 -import org.apache.cassandra.cache.CachingOptions;
 -import org.apache.cassandra.config.KSMetaData;
 -import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.compaction.CompactionInfo;
+ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+ import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.rows.*;
  import org.apache.cassandra.db.compaction.OperationType;
 -import org.apache.cassandra.db.filter.QueryFilter;
+ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 -import org.apache.cassandra.locator.SimpleStrategy;
+ import org.apache.cassandra.metrics.CompactionMetrics;
  import org.apache.cassandra.metrics.RestorableMeter;
 +import org.apache.cassandra.schema.CachingParams;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.utils.ByteBufferUtil;
  
  import static com.google.common.collect.ImmutableMap.of;
  import static java.util.Arrays.asList;
@@@ -587,7 -586,68 +601,67 @@@ public class IndexSummaryManagerTes
          for (Map.Entry<String, Integer> entry : intervals.entrySet())
          {
              if (entry.getKey().contains(CF_STANDARDLOWiINTERVAL))
 -                assertTrue(entry.getValue() >= cfs.metadata.getMinIndexInterval());
 +                assertTrue(entry.getValue() >= cfs.metadata.params.minIndexInterval);
          }
      }
+ 
+     @Test
+     public void testCancelIndex() throws Exception
+     {
+         String ksname = KEYSPACE1;
+         String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no key caching
+         Keyspace keyspace = Keyspace.open(ksname);
+         final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+         final int numSSTables = 4;
 -        final int numTries = 4;
+         int numRows = 256;
+         createSSTables(ksname, cfname, numSSTables, numRows);
+ 
 -        final List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
++        final List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+         for (SSTableReader sstable : sstables)
+             sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
+ 
+         final long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize();
+ 
+         // everything should get cut in half
+         final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>();
+ 
+         Thread t = new Thread(new Runnable()
+         {
+             public void run()
+             {
+                 try
+                 {
+                     // Don't leave enough space for even the minimal index summaries
+                     try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+                     {
+                         redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), singleSummaryOffHeapSpace);
+                     }
+                 }
+                 catch (CompactionInterruptedException ex)
+                 {
+                     exception.set(ex);
+                 }
+                 catch (IOException ignored)
+                 {
+                 }
+             }
+         });
+         t.start();
+         while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive())
 -            Thread.yield();
++            Thread.sleep(1);
+         CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+         t.join();
+ 
+         assertNotNull("Expected compaction interrupted exception", exception.get());
+         assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty());
+ 
+         Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables);
 -        Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getSSTables());
++        Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getLiveSSTables());
+         Set<SSTableReader> disjoint = Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables);
+         assertTrue(String.format("Mismatched files before and after cancelling redistribution: %s",
+                                  Joiner.on(",").join(disjoint)),
+                    disjoint.isEmpty());
+ 
+         validateData(cfs, numRows);
+     }
  }


[4/4] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by ma...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3f79c5ba
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3f79c5ba
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3f79c5ba

Branch: refs/heads/trunk
Commit: 3f79c5baae355924866b4997b18960b6590f5763
Parents: 2cd18ef 65885e7
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Dec 11 17:43:02 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Dec 11 17:43:02 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../cassandra/db/compaction/CompactionInfo.java |  14 +-
 .../db/compaction/CompactionManager.java        |  15 +
 .../cassandra/db/compaction/OperationType.java  |   3 +-
 .../io/sstable/IndexSummaryManager.java         | 279 +--------------
 .../io/sstable/IndexSummaryRedistribution.java  | 349 +++++++++++++++++++
 .../io/sstable/IndexSummaryManagerTest.java     |  80 ++++-
 7 files changed, 461 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f79c5ba/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 296d3cd,5932dbb..fad4bb2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,23 -1,7 +1,25 @@@
 -3.0.2
 +3.2
 + * Add compressor name in sstablemetadata output (CASSANDRA-9879)
 + * Fix type casting for counter columns (CASSANDRA-10824)
 + * Prevent running Cassandra as root (CASSANDRA-8142)
 + * bound maximum in-flight commit log replay mutation bytes to 64 megabytes (CASSANDRA-8639)
 + * Normalize all scripts (CASSANDRA-10679)
 + * Make compression ratio much more accurate (CASSANDRA-10225)
 + * Optimize building of Clustering object when only one is created (CASSANDRA-10409)
 + * Make index building pluggable (CASSANDRA-10681)
 + * Add sstable flush observer (CASSANDRA-10678)
 + * Improve NTS endpoints calculation (CASSANDRA-10200)
 + * Improve performance of the folderSize function (CASSANDRA-10677)
 + * Add support for type casting in selection clause (CASSANDRA-10310)
 + * Added graphing option to cassandra-stress (CASSANDRA-7918)
 + * Abort in-progress queries that time out (CASSANDRA-7392)
 + * Add transparent data encryption core classes (CASSANDRA-9945)
+ Merged from 2.1:
+  * Allow cancellation of index summary redistribution (CASSANDRA-8805)
 -3.0.1
 +
 +
 +3.1
 +Merged from 3.0:
   * Avoid MV race during node decommission (CASSANDRA-10674)
   * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
   * Handle single-column deletions correction in materialized views

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f79c5ba/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------