You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/12/11 17:44:05 UTC
cassandra git commit: Allow cancellation of index summary
redistribution
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 95dab2730 -> fc7075a41
Allow cancellation of index summary redistribution
Patch by Carl Yeksigian; reviewed by marcuse for CASSANDRA-8805
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fc7075a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fc7075a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fc7075a4
Branch: refs/heads/cassandra-2.1
Commit: fc7075a41837301f3866333e0eb5c464715d888c
Parents: 95dab27
Author: Carl Yeksigian <ca...@apache.org>
Authored: Tue Dec 8 12:22:25 2015 -0500
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Dec 11 17:20:18 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/compaction/CompactionInfo.java | 14 +-
.../db/compaction/CompactionManager.java | 14 +
.../cassandra/db/compaction/OperationType.java | 3 +-
.../io/sstable/IndexSummaryManager.java | 265 +--------------
.../io/sstable/IndexSummaryRedistribution.java | 338 +++++++++++++++++++
.../io/sstable/IndexSummaryManagerTest.java | 69 +++-
7 files changed, 435 insertions(+), 269 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 46cda65..2ee8b07 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.13
+ * Allow cancellation of index summary redistribution (CASSANDRA-8805)
* Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
* Fix Stress profile parsing on Windows (CASSANDRA-10808)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index d086eef..e88143e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -98,9 +98,17 @@ public final class CompactionInfo implements Serializable
public String toString()
{
StringBuilder buff = new StringBuilder();
- buff.append(getTaskType()).append('@').append(getId());
- buff.append('(').append(getKeyspace()).append(", ").append(getColumnFamily());
- buff.append(", ").append(getCompleted()).append('/').append(getTotal());
+ buff.append(getTaskType());
+ if (cfm != null)
+ {
+ buff.append('@').append(getId()).append('(');
+ buff.append(getKeyspace()).append(", ").append(getColumnFamily()).append(", ");
+ }
+ else
+ {
+ buff.append('(');
+ }
+ buff.append(getCompleted()).append('/').append(getTotal());
return buff.append(')').append(unit).toString();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 2630ba2..9bddaf5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1222,6 +1222,20 @@ public class CompactionManager implements CompactionManagerMBean
return executor.submit(runnable);
}
+ public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws IOException
+ {
+ metrics.beginCompaction(redistribution);
+
+ try
+ {
+ return redistribution.redistributeSummaries();
+ }
+ finally
+ {
+ metrics.finishCompaction(redistribution);
+ }
+ }
+
static int getDefaultGcBefore(ColumnFamilyStore cfs)
{
// 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 15d18f6..475b591 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -31,7 +31,8 @@ public enum OperationType
/** Compaction for tombstone removal */
TOMBSTONE_COMPACTION("Tombstone Compaction"),
UNKNOWN("Unknown compaction type"),
- ANTICOMPACTION("Anticompaction after repair");
+ ANTICOMPACTION("Anticompaction after repair"),
+ INDEX_SUMMARY("Index summary redistribution");
private final String type;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 0c196ff..be5cc3c 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.io.sstable;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,7 +31,6 @@ import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
@@ -45,11 +42,10 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DataTracker;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
-import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
-
/**
* Manages the fixed-size memory pool for index summaries, periodically resizing them
* in order to give more memory to hot sstables and less memory to cold sstables.
@@ -255,261 +251,6 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
@VisibleForTesting
public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes) throws IOException
{
- long total = 0;
- for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting))
- total += sstable.getIndexSummaryOffHeapSize();
-
- List<SSTableReader> oldFormatSSTables = new ArrayList<>();
- for (SSTableReader sstable : nonCompacting)
- {
- // We can't change the sampling level of sstables with the old format, because the serialization format
- // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.)
- logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
- if (!sstable.descriptor.version.hasSamplingLevel)
- oldFormatSSTables.add(sstable);
- }
- nonCompacting.removeAll(oldFormatSSTables);
-
- logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
- nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
-
- final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size());
- double totalReadsPerSec = 0.0;
- for (SSTableReader sstable : nonCompacting)
- {
- if (sstable.getReadMeter() != null)
- {
- Double readRate = sstable.getReadMeter().fifteenMinuteRate();
- totalReadsPerSec += readRate;
- readRates.put(sstable, readRate);
- }
- }
- logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
-
- // copy and sort by read rates (ascending)
- List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting);
- Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
-
- long remainingBytes = memoryPoolBytes;
- for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
- remainingBytes -= sstable.getIndexSummaryOffHeapSize();
-
- logger.trace("Index summaries for compacting SSTables are using {} MB of space",
- (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
- List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes);
-
- total = 0;
- for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
- total += sstable.getIndexSummaryOffHeapSize();
- logger.debug("Completed resizing of index summaries; current approximate memory used: {} MB",
- total / 1024.0 / 1024.0);
-
- return newSSTables;
- }
-
- private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables,
- double totalReadsPerSec, long memoryPoolCapacity) throws IOException
- {
-
- List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4);
- List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
- List<ResampleEntry> forceResample = new ArrayList<>();
- List<ResampleEntry> forceUpsample = new ArrayList<>();
- List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
-
- // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional
- // to the number of total reads/sec it handles.
- long remainingSpace = memoryPoolCapacity;
- for (SSTableReader sstable : sstables)
- {
- int minIndexInterval = sstable.metadata.getMinIndexInterval();
- int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
-
- double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
- long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
-
- // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that
- int currentNumEntries = sstable.getIndexSummarySize();
- double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
- long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
- int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
- int maxSummarySize = sstable.getMaxIndexSummarySize();
-
- // if the min_index_interval changed, calculate what our current sampling level would be under the new min
- if (sstable.getMinIndexInterval() != minIndexInterval)
- {
- int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval()));
- maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval));
- logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})",
- sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel);
- currentSamplingLevel = effectiveSamplingLevel;
- }
-
- int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries,
- minIndexInterval, maxIndexInterval);
- int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize);
- double effectiveIndexInterval = sstable.getEffectiveIndexInterval();
-
- logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " +
- "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)",
- sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries,
- currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
- numEntriesAtNewSamplingLevel * avgEntrySize);
-
- if (effectiveIndexInterval < minIndexInterval)
- {
- // The min_index_interval was changed; re-sample to match it.
- logger.debug("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})",
- sstable, effectiveIndexInterval, minIndexInterval);
- long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
- forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
- remainingSpace -= spaceUsed;
- }
- else if (effectiveIndexInterval > maxIndexInterval)
- {
- // The max_index_interval was lowered; force an upsample to the effective minimum sampling level
- logger.debug("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})",
- sstable, effectiveIndexInterval, maxIndexInterval);
- newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval);
- numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize());
- long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
- forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
- remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
- }
- else if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
- {
- long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
- toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
- remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
- }
- else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
- {
- long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
- toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
- remainingSpace -= spaceUsed;
- }
- else
- {
- // keep the same sampling level
- logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
- remainingSpace -= sstable.getIndexSummaryOffHeapSize();
- newSSTables.add(sstable);
- }
- totalReadsPerSec -= readsPerSec;
- }
-
- if (remainingSpace > 0)
- {
- Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace);
- toDownsample = result.right;
- newSSTables.addAll(result.left);
- }
-
- // downsample first, then upsample
- toDownsample.addAll(forceResample);
- toDownsample.addAll(toUpsample);
- toDownsample.addAll(forceUpsample);
- Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create();
- Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create();
- for (ResampleEntry entry : toDownsample)
- {
- SSTableReader sstable = entry.sstable;
- logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
- sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
- entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
- ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName());
- SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
- DataTracker tracker = cfs.getDataTracker();
-
- replacedByTracker.put(tracker, sstable);
- replacementsByTracker.put(tracker, replacement);
- }
-
- for (DataTracker tracker : replacedByTracker.keySet())
- {
- tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
- newSSTables.addAll(replacementsByTracker.get(tracker));
- }
-
- return newSSTables;
- }
-
- @VisibleForTesting
- static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
- {
- // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations
- // that will make little difference.
- Collections.sort(toDownsample, new Comparator<ResampleEntry>()
- {
- public int compare(ResampleEntry o1, ResampleEntry o2)
- {
- return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
- o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
- }
- });
-
- int noDownsampleCutoff = 0;
- List<SSTableReader> willNotDownsample = new ArrayList<>();
- while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
- {
- ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
-
- long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
- // see if we have enough leftover space to keep the current sampling level
- if (extraSpaceRequired <= remainingSpace)
- {
- logger.trace("Using leftover space to keep {} at the current sampling level ({})",
- entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
- willNotDownsample.add(entry.sstable);
- remainingSpace -= extraSpaceRequired;
- }
- else
- {
- break;
- }
-
- noDownsampleCutoff++;
- }
- return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
- }
-
- private static class ResampleEntry
- {
- public final SSTableReader sstable;
- public final long newSpaceUsed;
- public final int newSamplingLevel;
-
- public ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
- {
- this.sstable = sstable;
- this.newSpaceUsed = newSpaceUsed;
- this.newSamplingLevel = newSamplingLevel;
- }
- }
-
- /** Utility class for sorting sstables by their read rates. */
- private static class ReadRateComparator implements Comparator<SSTableReader>
- {
- private final Map<SSTableReader, Double> readRates;
-
- public ReadRateComparator(Map<SSTableReader, Double> readRates)
- {
- this.readRates = readRates;
- }
-
- @Override
- public int compare(SSTableReader o1, SSTableReader o2)
- {
- Double readRate1 = readRates.get(o1);
- Double readRate2 = readRates.get(o2);
- if (readRate1 == null && readRate2 == null)
- return 0;
- else if (readRate1 == null)
- return -1;
- else if (readRate2 == null)
- return 1;
- else
- return Double.compare(readRate1, readRate2);
- }
+ return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, nonCompacting, memoryPoolBytes));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
new file mode 100644
index 0000000..adb3e4e
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+
+public class IndexSummaryRedistribution extends CompactionInfo.Holder
+{
+ private static final Logger logger = LoggerFactory.getLogger(IndexSummaryRedistribution.class);
+
+ private final List<SSTableReader> compacting;
+ private final List<SSTableReader> nonCompacting;
+ private final long memoryPoolBytes;
+ private volatile long remainingSpace;
+
+ public IndexSummaryRedistribution(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes)
+ {
+ this.compacting = compacting;
+ this.nonCompacting = nonCompacting;
+ this.memoryPoolBytes = memoryPoolBytes;
+ }
+
+ public List<SSTableReader> redistributeSummaries() throws IOException
+ {
+ long total = 0;
+ for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting))
+ total += sstable.getIndexSummaryOffHeapSize();
+
+ List<SSTableReader> oldFormatSSTables = new ArrayList<>();
+ for (SSTableReader sstable : nonCompacting)
+ {
+ // We can't change the sampling level of sstables with the old format, because the serialization format
+ // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.)
+ logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
+ if (!sstable.descriptor.version.hasSamplingLevel)
+ oldFormatSSTables.add(sstable);
+ }
+ nonCompacting.removeAll(oldFormatSSTables);
+
+ logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
+ nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
+
+ final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size());
+ double totalReadsPerSec = 0.0;
+ for (SSTableReader sstable : nonCompacting)
+ {
+ if (isStopRequested())
+ throw new CompactionInterruptedException(getCompactionInfo());
+
+ if (sstable.getReadMeter() != null)
+ {
+ Double readRate = sstable.getReadMeter().fifteenMinuteRate();
+ totalReadsPerSec += readRate;
+ readRates.put(sstable, readRate);
+ }
+ }
+ logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
+
+ // copy and sort by read rates (ascending)
+ List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting);
+ Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
+
+ long remainingBytes = memoryPoolBytes;
+ for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
+ remainingBytes -= sstable.getIndexSummaryOffHeapSize();
+
+ logger.trace("Index summaries for compacting SSTables are using {} MB of space",
+ (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
+ List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes);
+
+ total = 0;
+ for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
+ total += sstable.getIndexSummaryOffHeapSize();
+ logger.debug("Completed resizing of index summaries; current approximate memory used: {} MB",
+ total / 1024.0 / 1024.0);
+
+ return newSSTables;
+ }
+
+ private List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables,
+ double totalReadsPerSec, long memoryPoolCapacity) throws IOException
+ {
+
+ List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4);
+ List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
+ List<ResampleEntry> forceResample = new ArrayList<>();
+ List<ResampleEntry> forceUpsample = new ArrayList<>();
+ List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
+
+ // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional
+ // to the number of total reads/sec it handles.
+ remainingSpace = memoryPoolCapacity;
+ for (SSTableReader sstable : sstables)
+ {
+ if (isStopRequested())
+ throw new CompactionInterruptedException(getCompactionInfo());
+
+ int minIndexInterval = sstable.metadata.getMinIndexInterval();
+ int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
+
+ double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
+ long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
+
+ // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that
+ int currentNumEntries = sstable.getIndexSummarySize();
+ double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
+ long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
+ int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
+ int maxSummarySize = sstable.getMaxIndexSummarySize();
+
+ // if the min_index_interval changed, calculate what our current sampling level would be under the new min
+ if (sstable.getMinIndexInterval() != minIndexInterval)
+ {
+ int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval()));
+ maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval));
+ logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})",
+ sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel);
+ currentSamplingLevel = effectiveSamplingLevel;
+ }
+
+ int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries,
+ minIndexInterval, maxIndexInterval);
+ int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize);
+ double effectiveIndexInterval = sstable.getEffectiveIndexInterval();
+
+ logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " +
+ "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)",
+ sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries,
+ currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
+ numEntriesAtNewSamplingLevel * avgEntrySize);
+
+ if (effectiveIndexInterval < minIndexInterval)
+ {
+ // The min_index_interval was changed; re-sample to match it.
+ logger.debug("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})",
+ sstable, effectiveIndexInterval, minIndexInterval);
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= spaceUsed;
+ }
+ else if (effectiveIndexInterval > maxIndexInterval)
+ {
+ // The max_index_interval was lowered; force an upsample to the effective minimum sampling level
+ logger.debug("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})",
+ sstable, effectiveIndexInterval, maxIndexInterval);
+ newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval);
+ numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize());
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+ }
+ else if (targetNumEntries >= currentNumEntries * IndexSummaryManager.UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
+ {
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+ }
+ else if (targetNumEntries < currentNumEntries * IndexSummaryManager.DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
+ {
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= spaceUsed;
+ }
+ else
+ {
+ // keep the same sampling level
+ logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
+ remainingSpace -= sstable.getIndexSummaryOffHeapSize();
+ newSSTables.add(sstable);
+ }
+ totalReadsPerSec -= readsPerSec;
+ }
+
+ if (remainingSpace > 0)
+ {
+ Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace);
+ toDownsample = result.right;
+ newSSTables.addAll(result.left);
+ }
+
+ // downsample first, then upsample
+ toDownsample.addAll(forceResample);
+ toDownsample.addAll(toUpsample);
+ toDownsample.addAll(forceUpsample);
+ Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create();
+ Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create();
+
+ try
+ {
+ for (ResampleEntry entry : toDownsample)
+ {
+ if (isStopRequested())
+ throw new CompactionInterruptedException(getCompactionInfo());
+
+ SSTableReader sstable = entry.sstable;
+ logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
+ sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
+ entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
+ ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName());
+ DataTracker tracker = cfs.getDataTracker();
+ SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
+ newSSTables.add(replacement);
+ replacedByTracker.put(tracker, sstable);
+ replacementsByTracker.put(tracker, replacement);
+ }
+ }
+ finally
+ {
+ for (DataTracker tracker : replacedByTracker.keySet())
+ tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
+ }
+
+ return newSSTables;
+ }
+
+ @VisibleForTesting
+ static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
+ {
+ // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations
+ // that will make little difference.
+ Collections.sort(toDownsample, new Comparator<ResampleEntry>()
+ {
+ public int compare(ResampleEntry o1, ResampleEntry o2)
+ {
+ return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
+ o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
+ }
+ });
+
+ int noDownsampleCutoff = 0;
+ List<SSTableReader> willNotDownsample = new ArrayList<>();
+ while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
+ {
+ ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
+
+ long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
+ // see if we have enough leftover space to keep the current sampling level
+ if (extraSpaceRequired <= remainingSpace)
+ {
+ logger.trace("Using leftover space to keep {} at the current sampling level ({})",
+ entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
+ willNotDownsample.add(entry.sstable);
+ remainingSpace -= extraSpaceRequired;
+ }
+ else
+ {
+ break;
+ }
+
+ noDownsampleCutoff++;
+ }
+ return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
+ return new CompactionInfo(OperationType.INDEX_SUMMARY, (remainingSpace - memoryPoolBytes), memoryPoolBytes, "bytes");
+ }
+
+ /** Utility class for sorting sstables by their read rates. */
+ private static class ReadRateComparator implements Comparator<SSTableReader>
+ {
+ private final Map<SSTableReader, Double> readRates;
+
+ ReadRateComparator(Map<SSTableReader, Double> readRates)
+ {
+ this.readRates = readRates;
+ }
+
+ @Override
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ Double readRate1 = readRates.get(o1);
+ Double readRate2 = readRates.get(o2);
+ if (readRate1 == null && readRate2 == null)
+ return 0;
+ else if (readRate1 == null)
+ return -1;
+ else if (readRate2 == null)
+ return 1;
+ else
+ return Double.compare(readRate1, readRate2);
+ }
+ }
+
+ private static class ResampleEntry
+ {
+ public final SSTableReader sstable;
+ public final long newSpaceUsed;
+ public final int newSamplingLevel;
+
+ ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
+ {
+ this.sstable = sstable;
+ this.newSpaceUsed = newSpaceUsed;
+ this.newSamplingLevel = newSamplingLevel;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 64d3354..63928e2 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -22,12 +22,14 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,11 +38,12 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.metrics.RestorableMeter;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD;
@@ -75,6 +78,11 @@ public class IndexSummaryManagerTest extends SchemaLoader
@After
public void afterTest()
{
+ for (CompactionInfo.Holder holder: CompactionMetrics.getCompactions())
+ {
+ holder.stop();
+ }
+
String ksname = "Keyspace1";
String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
Keyspace keyspace = Keyspace.open(ksname);
@@ -499,4 +507,59 @@ public class IndexSummaryManagerTest extends SchemaLoader
assertTrue(entry.getValue() >= cfs.metadata.getMinIndexInterval());
}
}
+
+ @Test
+ public void testCancelIndex() throws Exception
+ {
+ String ksname = "Keyspace1";
+ String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
+ Keyspace keyspace = Keyspace.open(ksname);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ final int numSSTables = 4;
+ int numRows = 256;
+ createSSTables(ksname, cfname, numSSTables, numRows);
+
+ final List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
+ for (SSTableReader sstable : sstables)
+ sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
+
+ final long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize();
+
+ // everything should get cut in half
+ final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>();
+ Thread t = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ redistributeSummaries(Collections.<SSTableReader>emptyList(), sstables, (singleSummaryOffHeapSpace * (numSSTables / 2)));
+ }
+ catch (CompactionInterruptedException ex)
+ {
+ exception.set(ex);
+ }
+ catch (IOException ignored)
+ {
+ }
+ }
+ });
+ t.start();
+ while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive())
+ Thread.sleep(1);
+ CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+ t.join();
+
+ assertNotNull("Expected compaction interrupted exception", exception.get());
+ assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty());
+
+ Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables);
+ Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getSSTables());
+ Set<SSTableReader> disjoint = Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables);
+ assertTrue(String.format("Mismatched files before and after cancelling redistribution: %s",
+ Joiner.on(",").join(disjoint)),
+ disjoint.isEmpty());
+
+ validateData(cfs, numRows);
+ }
}