You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/12/11 17:45:16 UTC
[1/4] cassandra git commit: Allow cancellation of index summary
redistribution
Repository: cassandra
Updated Branches:
refs/heads/trunk 2cd18ef5a -> 3f79c5baa
Allow cancellation of index summary redistribution
Patch by Carl Yeksigian; reviewed by marcuse for CASSANDRA-8805
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fc7075a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fc7075a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fc7075a4
Branch: refs/heads/trunk
Commit: fc7075a41837301f3866333e0eb5c464715d888c
Parents: 95dab27
Author: Carl Yeksigian <ca...@apache.org>
Authored: Tue Dec 8 12:22:25 2015 -0500
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Dec 11 17:20:18 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/compaction/CompactionInfo.java | 14 +-
.../db/compaction/CompactionManager.java | 14 +
.../cassandra/db/compaction/OperationType.java | 3 +-
.../io/sstable/IndexSummaryManager.java | 265 +--------------
.../io/sstable/IndexSummaryRedistribution.java | 338 +++++++++++++++++++
.../io/sstable/IndexSummaryManagerTest.java | 69 +++-
7 files changed, 435 insertions(+), 269 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 46cda65..2ee8b07 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.13
+ * Allow cancellation of index summary redistribution (CASSANDRA-8805)
* Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
* Fix Stress profile parsing on Windows (CASSANDRA-10808)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index d086eef..e88143e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -98,9 +98,17 @@ public final class CompactionInfo implements Serializable
public String toString()
{
StringBuilder buff = new StringBuilder();
- buff.append(getTaskType()).append('@').append(getId());
- buff.append('(').append(getKeyspace()).append(", ").append(getColumnFamily());
- buff.append(", ").append(getCompleted()).append('/').append(getTotal());
+ buff.append(getTaskType());
+ if (cfm != null)
+ {
+ buff.append('@').append(getId()).append('(');
+ buff.append(getKeyspace()).append(", ").append(getColumnFamily()).append(", ");
+ }
+ else
+ {
+ buff.append('(');
+ }
+ buff.append(getCompleted()).append('/').append(getTotal());
return buff.append(')').append(unit).toString();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 2630ba2..9bddaf5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1222,6 +1222,20 @@ public class CompactionManager implements CompactionManagerMBean
return executor.submit(runnable);
}
+ public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws IOException
+ {
+ metrics.beginCompaction(redistribution);
+
+ try
+ {
+ return redistribution.redistributeSummaries();
+ }
+ finally
+ {
+ metrics.finishCompaction(redistribution);
+ }
+ }
+
static int getDefaultGcBefore(ColumnFamilyStore cfs)
{
// 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 15d18f6..475b591 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -31,7 +31,8 @@ public enum OperationType
/** Compaction for tombstone removal */
TOMBSTONE_COMPACTION("Tombstone Compaction"),
UNKNOWN("Unknown compaction type"),
- ANTICOMPACTION("Anticompaction after repair");
+ ANTICOMPACTION("Anticompaction after repair"),
+ INDEX_SUMMARY("Index summary redistribution");
private final String type;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 0c196ff..be5cc3c 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.io.sstable;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,7 +31,6 @@ import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
@@ -45,11 +42,10 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DataTracker;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
-import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
-
/**
* Manages the fixed-size memory pool for index summaries, periodically resizing them
* in order to give more memory to hot sstables and less memory to cold sstables.
@@ -255,261 +251,6 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
@VisibleForTesting
public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes) throws IOException
{
- long total = 0;
- for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting))
- total += sstable.getIndexSummaryOffHeapSize();
-
- List<SSTableReader> oldFormatSSTables = new ArrayList<>();
- for (SSTableReader sstable : nonCompacting)
- {
- // We can't change the sampling level of sstables with the old format, because the serialization format
- // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.)
- logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
- if (!sstable.descriptor.version.hasSamplingLevel)
- oldFormatSSTables.add(sstable);
- }
- nonCompacting.removeAll(oldFormatSSTables);
-
- logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
- nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
-
- final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size());
- double totalReadsPerSec = 0.0;
- for (SSTableReader sstable : nonCompacting)
- {
- if (sstable.getReadMeter() != null)
- {
- Double readRate = sstable.getReadMeter().fifteenMinuteRate();
- totalReadsPerSec += readRate;
- readRates.put(sstable, readRate);
- }
- }
- logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
-
- // copy and sort by read rates (ascending)
- List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting);
- Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
-
- long remainingBytes = memoryPoolBytes;
- for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
- remainingBytes -= sstable.getIndexSummaryOffHeapSize();
-
- logger.trace("Index summaries for compacting SSTables are using {} MB of space",
- (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
- List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes);
-
- total = 0;
- for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
- total += sstable.getIndexSummaryOffHeapSize();
- logger.debug("Completed resizing of index summaries; current approximate memory used: {} MB",
- total / 1024.0 / 1024.0);
-
- return newSSTables;
- }
-
- private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables,
- double totalReadsPerSec, long memoryPoolCapacity) throws IOException
- {
-
- List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4);
- List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
- List<ResampleEntry> forceResample = new ArrayList<>();
- List<ResampleEntry> forceUpsample = new ArrayList<>();
- List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
-
- // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional
- // to the number of total reads/sec it handles.
- long remainingSpace = memoryPoolCapacity;
- for (SSTableReader sstable : sstables)
- {
- int minIndexInterval = sstable.metadata.getMinIndexInterval();
- int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
-
- double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
- long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
-
- // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that
- int currentNumEntries = sstable.getIndexSummarySize();
- double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
- long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
- int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
- int maxSummarySize = sstable.getMaxIndexSummarySize();
-
- // if the min_index_interval changed, calculate what our current sampling level would be under the new min
- if (sstable.getMinIndexInterval() != minIndexInterval)
- {
- int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval()));
- maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval));
- logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})",
- sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel);
- currentSamplingLevel = effectiveSamplingLevel;
- }
-
- int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries,
- minIndexInterval, maxIndexInterval);
- int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize);
- double effectiveIndexInterval = sstable.getEffectiveIndexInterval();
-
- logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " +
- "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)",
- sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries,
- currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
- numEntriesAtNewSamplingLevel * avgEntrySize);
-
- if (effectiveIndexInterval < minIndexInterval)
- {
- // The min_index_interval was changed; re-sample to match it.
- logger.debug("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})",
- sstable, effectiveIndexInterval, minIndexInterval);
- long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
- forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
- remainingSpace -= spaceUsed;
- }
- else if (effectiveIndexInterval > maxIndexInterval)
- {
- // The max_index_interval was lowered; force an upsample to the effective minimum sampling level
- logger.debug("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})",
- sstable, effectiveIndexInterval, maxIndexInterval);
- newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval);
- numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize());
- long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
- forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
- remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
- }
- else if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
- {
- long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
- toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
- remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
- }
- else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
- {
- long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
- toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
- remainingSpace -= spaceUsed;
- }
- else
- {
- // keep the same sampling level
- logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
- remainingSpace -= sstable.getIndexSummaryOffHeapSize();
- newSSTables.add(sstable);
- }
- totalReadsPerSec -= readsPerSec;
- }
-
- if (remainingSpace > 0)
- {
- Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace);
- toDownsample = result.right;
- newSSTables.addAll(result.left);
- }
-
- // downsample first, then upsample
- toDownsample.addAll(forceResample);
- toDownsample.addAll(toUpsample);
- toDownsample.addAll(forceUpsample);
- Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create();
- Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create();
- for (ResampleEntry entry : toDownsample)
- {
- SSTableReader sstable = entry.sstable;
- logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
- sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
- entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
- ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName());
- SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
- DataTracker tracker = cfs.getDataTracker();
-
- replacedByTracker.put(tracker, sstable);
- replacementsByTracker.put(tracker, replacement);
- }
-
- for (DataTracker tracker : replacedByTracker.keySet())
- {
- tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
- newSSTables.addAll(replacementsByTracker.get(tracker));
- }
-
- return newSSTables;
- }
-
- @VisibleForTesting
- static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
- {
- // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations
- // that will make little difference.
- Collections.sort(toDownsample, new Comparator<ResampleEntry>()
- {
- public int compare(ResampleEntry o1, ResampleEntry o2)
- {
- return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
- o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
- }
- });
-
- int noDownsampleCutoff = 0;
- List<SSTableReader> willNotDownsample = new ArrayList<>();
- while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
- {
- ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
-
- long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
- // see if we have enough leftover space to keep the current sampling level
- if (extraSpaceRequired <= remainingSpace)
- {
- logger.trace("Using leftover space to keep {} at the current sampling level ({})",
- entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
- willNotDownsample.add(entry.sstable);
- remainingSpace -= extraSpaceRequired;
- }
- else
- {
- break;
- }
-
- noDownsampleCutoff++;
- }
- return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
- }
-
- private static class ResampleEntry
- {
- public final SSTableReader sstable;
- public final long newSpaceUsed;
- public final int newSamplingLevel;
-
- public ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
- {
- this.sstable = sstable;
- this.newSpaceUsed = newSpaceUsed;
- this.newSamplingLevel = newSamplingLevel;
- }
- }
-
- /** Utility class for sorting sstables by their read rates. */
- private static class ReadRateComparator implements Comparator<SSTableReader>
- {
- private final Map<SSTableReader, Double> readRates;
-
- public ReadRateComparator(Map<SSTableReader, Double> readRates)
- {
- this.readRates = readRates;
- }
-
- @Override
- public int compare(SSTableReader o1, SSTableReader o2)
- {
- Double readRate1 = readRates.get(o1);
- Double readRate2 = readRates.get(o2);
- if (readRate1 == null && readRate2 == null)
- return 0;
- else if (readRate1 == null)
- return -1;
- else if (readRate2 == null)
- return 1;
- else
- return Double.compare(readRate1, readRate2);
- }
+ return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, nonCompacting, memoryPoolBytes));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
new file mode 100644
index 0000000..adb3e4e
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+
+public class IndexSummaryRedistribution extends CompactionInfo.Holder
+{
+ private static final Logger logger = LoggerFactory.getLogger(IndexSummaryRedistribution.class);
+
+ private final List<SSTableReader> compacting;
+ private final List<SSTableReader> nonCompacting;
+ private final long memoryPoolBytes;
+ private volatile long remainingSpace;
+
+ public IndexSummaryRedistribution(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes)
+ {
+ this.compacting = compacting;
+ this.nonCompacting = nonCompacting;
+ this.memoryPoolBytes = memoryPoolBytes;
+ }
+
+ public List<SSTableReader> redistributeSummaries() throws IOException
+ {
+ long total = 0;
+ for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting))
+ total += sstable.getIndexSummaryOffHeapSize();
+
+ List<SSTableReader> oldFormatSSTables = new ArrayList<>();
+ for (SSTableReader sstable : nonCompacting)
+ {
+ // We can't change the sampling level of sstables with the old format, because the serialization format
+ // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.)
+ logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
+ if (!sstable.descriptor.version.hasSamplingLevel)
+ oldFormatSSTables.add(sstable);
+ }
+ nonCompacting.removeAll(oldFormatSSTables);
+
+ logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
+ nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
+
+ final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size());
+ double totalReadsPerSec = 0.0;
+ for (SSTableReader sstable : nonCompacting)
+ {
+ if (isStopRequested())
+ throw new CompactionInterruptedException(getCompactionInfo());
+
+ if (sstable.getReadMeter() != null)
+ {
+ Double readRate = sstable.getReadMeter().fifteenMinuteRate();
+ totalReadsPerSec += readRate;
+ readRates.put(sstable, readRate);
+ }
+ }
+ logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
+
+ // copy and sort by read rates (ascending)
+ List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting);
+ Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
+
+ long remainingBytes = memoryPoolBytes;
+ for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
+ remainingBytes -= sstable.getIndexSummaryOffHeapSize();
+
+ logger.trace("Index summaries for compacting SSTables are using {} MB of space",
+ (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
+ List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes);
+
+ total = 0;
+ for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
+ total += sstable.getIndexSummaryOffHeapSize();
+ logger.debug("Completed resizing of index summaries; current approximate memory used: {} MB",
+ total / 1024.0 / 1024.0);
+
+ return newSSTables;
+ }
+
+ private List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables,
+ double totalReadsPerSec, long memoryPoolCapacity) throws IOException
+ {
+
+ List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4);
+ List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
+ List<ResampleEntry> forceResample = new ArrayList<>();
+ List<ResampleEntry> forceUpsample = new ArrayList<>();
+ List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
+
+ // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional
+ // to the number of total reads/sec it handles.
+ remainingSpace = memoryPoolCapacity;
+ for (SSTableReader sstable : sstables)
+ {
+ if (isStopRequested())
+ throw new CompactionInterruptedException(getCompactionInfo());
+
+ int minIndexInterval = sstable.metadata.getMinIndexInterval();
+ int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
+
+ double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
+ long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
+
+ // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that
+ int currentNumEntries = sstable.getIndexSummarySize();
+ double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
+ long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
+ int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
+ int maxSummarySize = sstable.getMaxIndexSummarySize();
+
+ // if the min_index_interval changed, calculate what our current sampling level would be under the new min
+ if (sstable.getMinIndexInterval() != minIndexInterval)
+ {
+ int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval()));
+ maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval));
+ logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})",
+ sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel);
+ currentSamplingLevel = effectiveSamplingLevel;
+ }
+
+ int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries,
+ minIndexInterval, maxIndexInterval);
+ int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize);
+ double effectiveIndexInterval = sstable.getEffectiveIndexInterval();
+
+ logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " +
+ "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)",
+ sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries,
+ currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
+ numEntriesAtNewSamplingLevel * avgEntrySize);
+
+ if (effectiveIndexInterval < minIndexInterval)
+ {
+ // The min_index_interval was changed; re-sample to match it.
+ logger.debug("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})",
+ sstable, effectiveIndexInterval, minIndexInterval);
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= spaceUsed;
+ }
+ else if (effectiveIndexInterval > maxIndexInterval)
+ {
+ // The max_index_interval was lowered; force an upsample to the effective minimum sampling level
+ logger.debug("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})",
+ sstable, effectiveIndexInterval, maxIndexInterval);
+ newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval);
+ numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize());
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+ }
+ else if (targetNumEntries >= currentNumEntries * IndexSummaryManager.UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
+ {
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+ }
+ else if (targetNumEntries < currentNumEntries * IndexSummaryManager.DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
+ {
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= spaceUsed;
+ }
+ else
+ {
+ // keep the same sampling level
+ logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
+ remainingSpace -= sstable.getIndexSummaryOffHeapSize();
+ newSSTables.add(sstable);
+ }
+ totalReadsPerSec -= readsPerSec;
+ }
+
+ if (remainingSpace > 0)
+ {
+ Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace);
+ toDownsample = result.right;
+ newSSTables.addAll(result.left);
+ }
+
+ // downsample first, then upsample
+ toDownsample.addAll(forceResample);
+ toDownsample.addAll(toUpsample);
+ toDownsample.addAll(forceUpsample);
+ Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create();
+ Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create();
+
+ try
+ {
+ for (ResampleEntry entry : toDownsample)
+ {
+ if (isStopRequested())
+ throw new CompactionInterruptedException(getCompactionInfo());
+
+ SSTableReader sstable = entry.sstable;
+ logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
+ sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
+ entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
+ ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName());
+ DataTracker tracker = cfs.getDataTracker();
+ SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
+ newSSTables.add(replacement);
+ replacedByTracker.put(tracker, sstable);
+ replacementsByTracker.put(tracker, replacement);
+ }
+ }
+ finally
+ {
+ for (DataTracker tracker : replacedByTracker.keySet())
+ tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
+ }
+
+ return newSSTables;
+ }
+
+ @VisibleForTesting
+ static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
+ {
+ // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations
+ // that will make little difference.
+ Collections.sort(toDownsample, new Comparator<ResampleEntry>()
+ {
+ public int compare(ResampleEntry o1, ResampleEntry o2)
+ {
+ return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
+ o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
+ }
+ });
+
+ int noDownsampleCutoff = 0;
+ List<SSTableReader> willNotDownsample = new ArrayList<>();
+ while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
+ {
+ ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
+
+ long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
+ // see if we have enough leftover space to keep the current sampling level
+ if (extraSpaceRequired <= remainingSpace)
+ {
+ logger.trace("Using leftover space to keep {} at the current sampling level ({})",
+ entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
+ willNotDownsample.add(entry.sstable);
+ remainingSpace -= extraSpaceRequired;
+ }
+ else
+ {
+ break;
+ }
+
+ noDownsampleCutoff++;
+ }
+ return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
+ return new CompactionInfo(OperationType.INDEX_SUMMARY, (remainingSpace - memoryPoolBytes), memoryPoolBytes, "bytes");
+ }
+
+ /** Utility class for sorting sstables by their read rates. */
+ private static class ReadRateComparator implements Comparator<SSTableReader>
+ {
+ private final Map<SSTableReader, Double> readRates;
+
+ ReadRateComparator(Map<SSTableReader, Double> readRates)
+ {
+ this.readRates = readRates;
+ }
+
+ @Override
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ Double readRate1 = readRates.get(o1);
+ Double readRate2 = readRates.get(o2);
+ if (readRate1 == null && readRate2 == null)
+ return 0;
+ else if (readRate1 == null)
+ return -1;
+ else if (readRate2 == null)
+ return 1;
+ else
+ return Double.compare(readRate1, readRate2);
+ }
+ }
+
+ private static class ResampleEntry
+ {
+ public final SSTableReader sstable;
+ public final long newSpaceUsed;
+ public final int newSamplingLevel;
+
+ ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
+ {
+ this.sstable = sstable;
+ this.newSpaceUsed = newSpaceUsed;
+ this.newSamplingLevel = newSamplingLevel;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc7075a4/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 64d3354..63928e2 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -22,12 +22,14 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,11 +38,12 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.metrics.RestorableMeter;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD;
@@ -75,6 +78,11 @@ public class IndexSummaryManagerTest extends SchemaLoader
@After
public void afterTest()
{
+ for (CompactionInfo.Holder holder: CompactionMetrics.getCompactions())
+ {
+ holder.stop();
+ }
+
String ksname = "Keyspace1";
String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
Keyspace keyspace = Keyspace.open(ksname);
@@ -499,4 +507,59 @@ public class IndexSummaryManagerTest extends SchemaLoader
assertTrue(entry.getValue() >= cfs.metadata.getMinIndexInterval());
}
}
+
+ @Test
+ public void testCancelIndex() throws Exception
+ {
+ String ksname = "Keyspace1";
+ String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
+ Keyspace keyspace = Keyspace.open(ksname);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ final int numSSTables = 4;
+ int numRows = 256;
+ createSSTables(ksname, cfname, numSSTables, numRows);
+
+ final List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
+ for (SSTableReader sstable : sstables)
+ sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
+
+ final long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize();
+
+ // everything should get cut in half
+ final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>();
+ Thread t = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ redistributeSummaries(Collections.<SSTableReader>emptyList(), sstables, (singleSummaryOffHeapSpace * (numSSTables / 2)));
+ }
+ catch (CompactionInterruptedException ex)
+ {
+ exception.set(ex);
+ }
+ catch (IOException ignored)
+ {
+ }
+ }
+ });
+ t.start();
+ while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive())
+ Thread.sleep(1);
+ CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+ t.join();
+
+ assertNotNull("Expected compaction interrupted exception", exception.get());
+ assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty());
+
+ Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables);
+ Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getSSTables());
+ Set<SSTableReader> disjoint = Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables);
+ assertTrue(String.format("Mismatched files before and after cancelling redistribution: %s",
+ Joiner.on(",").join(disjoint)),
+ disjoint.isEmpty());
+
+ validateData(cfs, numRows);
+ }
}
[2/4] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1667494
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1667494
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1667494
Branch: refs/heads/trunk
Commit: f16674949592b518ba9da837b70665df10832e9b
Parents: 7dd6b7d fc7075a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Dec 11 17:32:31 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Dec 11 17:32:31 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/compaction/CompactionInfo.java | 14 +-
.../db/compaction/CompactionManager.java | 14 +
.../cassandra/db/compaction/OperationType.java | 3 +-
.../io/sstable/IndexSummaryManager.java | 279 +--------------
.../io/sstable/IndexSummaryRedistribution.java | 349 +++++++++++++++++++
.../io/sstable/IndexSummaryManagerTest.java | 83 ++++-
7 files changed, 459 insertions(+), 284 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 30a76a9,2ee8b07..5da0d42
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,31 -1,10 +1,32 @@@
-2.1.13
- * Allow cancellation of index summary redistribution (CASSANDRA-8805)
+2.2.5
+ * Fix regression in split size on CqlInputFormat (CASSANDRA-10835)
+ * Better handling of SSL connection errors inter-node (CASSANDRA-10816)
* Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
+ * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
+Merged from 2.1:
++ * Allow cancellation of index summary redistribution (CASSANDRA-8805)
* Fix Stress profile parsing on Windows (CASSANDRA-10808)
-
-2.1.12
+2.2.4
+ * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
+ * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
+ * Reject index queries while the index is building (CASSANDRA-8505)
+ * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
+ * Fix JSON update with prepared statements (CASSANDRA-10631)
+ * Don't do anticompaction after subrange repair (CASSANDRA-10422)
+ * Fix SimpleDateType type compatibility (CASSANDRA-10027)
+ * (Hadoop) fix splits calculation (CASSANDRA-10640)
+ * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
+ * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
+ * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
+ * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
+ * Expose phi values from failure detector via JMX and tweak debug
+ and trace logging (CASSANDRA-9526)
+ * Fix RangeNamesQueryPager (CASSANDRA-10509)
+ * Deprecate Pig support (CASSANDRA-10542)
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+ * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592)
+Merged from 2.1:
* Fix incremental repair hang when replica is down (CASSANDRA-10288)
* Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
* Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 65f93c0,9bddaf5..ba9c25e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1345,7 -1222,21 +1345,21 @@@ public class CompactionManager implemen
return executor.submit(runnable);
}
+ public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws IOException
+ {
+ metrics.beginCompaction(redistribution);
+
+ try
+ {
+ return redistribution.redistributeSummaries();
+ }
+ finally
+ {
+ metrics.finishCompaction(redistribution);
+ }
+ }
+
- static int getDefaultGcBefore(ColumnFamilyStore cfs)
+ public static int getDefaultGcBefore(ColumnFamilyStore cfs)
{
// 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to
// add any GcGrace however since 2ndary indexes are local to a node.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/OperationType.java
index a14f13f,475b591..6b66ded
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@@ -32,7 -32,7 +32,8 @@@ public enum OperationTyp
TOMBSTONE_COMPACTION("Tombstone Compaction"),
UNKNOWN("Unknown compaction type"),
ANTICOMPACTION("Anticompaction after repair"),
- VERIFY("Verify");
++ VERIFY("Verify"),
+ INDEX_SUMMARY("Index summary redistribution");
private final String type;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 1dd3a4e,be5cc3c..4438dc1
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@@ -26,19 -30,19 +26,20 @@@ import javax.management.MBeanServer
import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
- import com.google.common.collect.*;
-
- import org.apache.cassandra.io.sstable.format.SSTableReader;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
++import com.google.common.collect.ImmutableSet;
+ import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.View;
+ import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
@@@ -57,13 -59,13 +56,6 @@@ public class IndexSummaryManager implem
private int resizeIntervalInMinutes = 0;
private long memoryPoolBytes;
-- // The target (or ideal) number of index summary entries must differ from the actual number of
-- // entries by this ratio in order to trigger an upsample or downsample of the summary. Because
-- // upsampling requires reading the primary index in order to rebuild the summary, the threshold
-- // for upsampling is is higher.
-- static final double UPSAMPLE_THRESHOLD = 1.5;
-- static final double DOWNSAMPLE_THESHOLD = 0.75;
--
private final DebuggableScheduledThreadPoolExecutor executor;
// our next scheduled resizing run
@@@ -251,267 -249,8 +243,8 @@@
* @return a list of new SSTableReader instances
*/
@VisibleForTesting
- public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes) throws IOException
+ public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes) throws IOException
{
- logger.info("Redistributing index summaries");
- List<SSTableReader> oldFormatSSTables = new ArrayList<>();
- List<SSTableReader> redistribute = new ArrayList<>();
- for (LifecycleTransaction txn : transactions.values())
- {
- for (SSTableReader sstable : ImmutableList.copyOf(txn.originals()))
- {
- // We can't change the sampling level of sstables with the old format, because the serialization format
- // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.)
- logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
- if (!sstable.descriptor.version.hasSamplingLevel())
- {
- oldFormatSSTables.add(sstable);
- txn.cancel(sstable);
- }
- }
- redistribute.addAll(txn.originals());
- }
-
- long total = 0;
- for (SSTableReader sstable : Iterables.concat(compacting, redistribute))
- total += sstable.getIndexSummaryOffHeapSize();
-
- logger.trace("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
- redistribute.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
-
- final Map<SSTableReader, Double> readRates = new HashMap<>(redistribute.size());
- double totalReadsPerSec = 0.0;
- for (SSTableReader sstable : redistribute)
- {
- if (sstable.getReadMeter() != null)
- {
- Double readRate = sstable.getReadMeter().fifteenMinuteRate();
- totalReadsPerSec += readRate;
- readRates.put(sstable, readRate);
- }
- }
- logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
-
- // copy and sort by read rates (ascending)
- List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute);
- Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
-
- long remainingBytes = memoryPoolBytes;
- for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
- remainingBytes -= sstable.getIndexSummaryOffHeapSize();
-
- logger.trace("Index summaries for compacting SSTables are using {} MB of space",
- (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
- List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, transactions, totalReadsPerSec, remainingBytes);
-
- for (LifecycleTransaction txn : transactions.values())
- txn.finish();
-
- total = 0;
- for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
- total += sstable.getIndexSummaryOffHeapSize();
- logger.trace("Completed resizing of index summaries; current approximate memory used: {} MB",
- total / 1024.0 / 1024.0);
-
- return newSSTables;
- }
-
- private static List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables, Map<UUID, LifecycleTransaction> transactions,
- double totalReadsPerSec, long memoryPoolCapacity) throws IOException
- {
-
- List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4);
- List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
- List<ResampleEntry> forceResample = new ArrayList<>();
- List<ResampleEntry> forceUpsample = new ArrayList<>();
- List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
-
- // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional
- // to the number of total reads/sec it handles.
- long remainingSpace = memoryPoolCapacity;
- for (SSTableReader sstable : sstables)
- {
- int minIndexInterval = sstable.metadata.getMinIndexInterval();
- int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
-
- double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
- long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
-
- // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that
- int currentNumEntries = sstable.getIndexSummarySize();
- double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
- long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
- int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
- int maxSummarySize = sstable.getMaxIndexSummarySize();
-
- // if the min_index_interval changed, calculate what our current sampling level would be under the new min
- if (sstable.getMinIndexInterval() != minIndexInterval)
- {
- int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval()));
- maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval));
- logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})",
- sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel);
- currentSamplingLevel = effectiveSamplingLevel;
- }
-
- int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries,
- minIndexInterval, maxIndexInterval);
- int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize);
- double effectiveIndexInterval = sstable.getEffectiveIndexInterval();
-
- logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " +
- "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)",
- sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries,
- currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
- numEntriesAtNewSamplingLevel * avgEntrySize);
-
- if (effectiveIndexInterval < minIndexInterval)
- {
- // The min_index_interval was changed; re-sample to match it.
- logger.trace("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})",
- sstable, effectiveIndexInterval, minIndexInterval);
- long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
- forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
- remainingSpace -= spaceUsed;
- }
- else if (effectiveIndexInterval > maxIndexInterval)
- {
- // The max_index_interval was lowered; force an upsample to the effective minimum sampling level
- logger.trace("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})",
- sstable, effectiveIndexInterval, maxIndexInterval);
- newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval);
- numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize());
- long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
- forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
- remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
- }
- else if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
- {
- long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
- toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
- remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
- }
- else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
- {
- long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
- toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
- remainingSpace -= spaceUsed;
- }
- else
- {
- // keep the same sampling level
- logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
- remainingSpace -= sstable.getIndexSummaryOffHeapSize();
- newSSTables.add(sstable);
- transactions.get(sstable.metadata.cfId).cancel(sstable);
- }
- totalReadsPerSec -= readsPerSec;
- }
-
- if (remainingSpace > 0)
- {
- Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace);
- toDownsample = result.right;
- newSSTables.addAll(result.left);
- for (SSTableReader sstable : result.left)
- transactions.get(sstable.metadata.cfId).cancel(sstable);
- }
-
- // downsample first, then upsample
- toDownsample.addAll(forceResample);
- toDownsample.addAll(toUpsample);
- toDownsample.addAll(forceUpsample);
- for (ResampleEntry entry : toDownsample)
- {
- SSTableReader sstable = entry.sstable;
- logger.trace("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
- sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
- entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
- ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
- SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
- newSSTables.add(replacement);
- transactions.get(sstable.metadata.cfId).update(replacement, true);
- }
-
- return newSSTables;
- }
-
- @VisibleForTesting
- static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
- {
- // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations
- // that will make little difference.
- Collections.sort(toDownsample, new Comparator<ResampleEntry>()
- {
- public int compare(ResampleEntry o1, ResampleEntry o2)
- {
- return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
- o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
- }
- });
-
- int noDownsampleCutoff = 0;
- List<SSTableReader> willNotDownsample = new ArrayList<>();
- while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
- {
- ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
-
- long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
- // see if we have enough leftover space to keep the current sampling level
- if (extraSpaceRequired <= remainingSpace)
- {
- logger.trace("Using leftover space to keep {} at the current sampling level ({})",
- entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
- willNotDownsample.add(entry.sstable);
- remainingSpace -= extraSpaceRequired;
- }
- else
- {
- break;
- }
-
- noDownsampleCutoff++;
- }
- return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
- }
-
- private static class ResampleEntry
- {
- public final SSTableReader sstable;
- public final long newSpaceUsed;
- public final int newSamplingLevel;
-
- public ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
- {
- this.sstable = sstable;
- this.newSpaceUsed = newSpaceUsed;
- this.newSamplingLevel = newSamplingLevel;
- }
- }
-
- /** Utility class for sorting sstables by their read rates. */
- private static class ReadRateComparator implements Comparator<SSTableReader>
- {
- private final Map<SSTableReader, Double> readRates;
-
- public ReadRateComparator(Map<SSTableReader, Double> readRates)
- {
- this.readRates = readRates;
- }
-
- @Override
- public int compare(SSTableReader o1, SSTableReader o2)
- {
- Double readRate1 = readRates.get(o1);
- Double readRate2 = readRates.get(o2);
- if (readRate1 == null && readRate2 == null)
- return 0;
- else if (readRate1 == null)
- return -1;
- else if (readRate2 == null)
- return 1;
- else
- return Double.compare(readRate1, readRate2);
- }
- return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, nonCompacting, memoryPoolBytes));
++ return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, transactions, memoryPoolBytes));
}
- }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 0000000,adb3e4e..aad479b
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@@ -1,0 -1,338 +1,349 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.io.sstable;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
++import java.util.UUID;
+
+ import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.HashMultimap;
++import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DataTracker;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.compaction.CompactionInfo;
+ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+ import org.apache.cassandra.db.compaction.OperationType;
++import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.utils.Pair;
+
+ import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+
+ public class IndexSummaryRedistribution extends CompactionInfo.Holder
+ {
+ private static final Logger logger = LoggerFactory.getLogger(IndexSummaryRedistribution.class);
+
++ // The target (or ideal) number of index summary entries must differ from the actual number of
++ // entries by this ratio in order to trigger an upsample or downsample of the summary. Because
++ // upsampling requires reading the primary index in order to rebuild the summary, the threshold
++ // for upsampling is is higher.
++ static final double UPSAMPLE_THRESHOLD = 1.5;
++ static final double DOWNSAMPLE_THESHOLD = 0.75;
++
+ private final List<SSTableReader> compacting;
- private final List<SSTableReader> nonCompacting;
++ private final Map<UUID, LifecycleTransaction> transactions;
+ private final long memoryPoolBytes;
++ private final UUID compactionId;
+ private volatile long remainingSpace;
+
- public IndexSummaryRedistribution(List<SSTableReader> compacting, List<SSTableReader> nonCompacting, long memoryPoolBytes)
++ public IndexSummaryRedistribution(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes)
+ {
+ this.compacting = compacting;
- this.nonCompacting = nonCompacting;
++ this.transactions = transactions;
+ this.memoryPoolBytes = memoryPoolBytes;
++ this.compactionId = UUID.randomUUID();
+ }
+
+ public List<SSTableReader> redistributeSummaries() throws IOException
+ {
- long total = 0;
- for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting))
- total += sstable.getIndexSummaryOffHeapSize();
-
++ logger.info("Redistributing index summaries");
+ List<SSTableReader> oldFormatSSTables = new ArrayList<>();
- for (SSTableReader sstable : nonCompacting)
++ List<SSTableReader> redistribute = new ArrayList<>();
++ for (LifecycleTransaction txn : transactions.values())
+ {
- // We can't change the sampling level of sstables with the old format, because the serialization format
- // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.)
- logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
- if (!sstable.descriptor.version.hasSamplingLevel)
- oldFormatSSTables.add(sstable);
++ for (SSTableReader sstable : ImmutableList.copyOf(txn.originals()))
++ {
++ // We can't change the sampling level of sstables with the old format, because the serialization format
++ // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.)
++ logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
++ if (!sstable.descriptor.version.hasSamplingLevel())
++ {
++ oldFormatSSTables.add(sstable);
++ txn.cancel(sstable);
++ }
++ }
++ redistribute.addAll(txn.originals());
+ }
- nonCompacting.removeAll(oldFormatSSTables);
+
- logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
- nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
++ long total = 0;
++ for (SSTableReader sstable : Iterables.concat(compacting, redistribute))
++ total += sstable.getIndexSummaryOffHeapSize();
+
- final Map<SSTableReader, Double> readRates = new HashMap<>(nonCompacting.size());
++ logger.trace("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
++ redistribute.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
++
++ final Map<SSTableReader, Double> readRates = new HashMap<>(redistribute.size());
+ double totalReadsPerSec = 0.0;
- for (SSTableReader sstable : nonCompacting)
++ for (SSTableReader sstable : redistribute)
+ {
+ if (isStopRequested())
+ throw new CompactionInterruptedException(getCompactionInfo());
+
+ if (sstable.getReadMeter() != null)
+ {
+ Double readRate = sstable.getReadMeter().fifteenMinuteRate();
+ totalReadsPerSec += readRate;
+ readRates.put(sstable, readRate);
+ }
+ }
+ logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
+
+ // copy and sort by read rates (ascending)
- List<SSTableReader> sstablesByHotness = new ArrayList<>(nonCompacting);
++ List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute);
+ Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
+
+ long remainingBytes = memoryPoolBytes;
+ for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
+ remainingBytes -= sstable.getIndexSummaryOffHeapSize();
+
+ logger.trace("Index summaries for compacting SSTables are using {} MB of space",
+ (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
- List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes);
++ List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, transactions, totalReadsPerSec, remainingBytes);
++
++ for (LifecycleTransaction txn : transactions.values())
++ txn.finish();
+
+ total = 0;
+ for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
+ total += sstable.getIndexSummaryOffHeapSize();
- logger.debug("Completed resizing of index summaries; current approximate memory used: {} MB",
++ logger.trace("Completed resizing of index summaries; current approximate memory used: {} MB",
+ total / 1024.0 / 1024.0);
+
+ return newSSTables;
+ }
+
+ private List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables,
++ Map<UUID, LifecycleTransaction> transactions,
+ double totalReadsPerSec, long memoryPoolCapacity) throws IOException
+ {
-
+ List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4);
+ List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
+ List<ResampleEntry> forceResample = new ArrayList<>();
+ List<ResampleEntry> forceUpsample = new ArrayList<>();
+ List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
+
+ // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional
+ // to the number of total reads/sec it handles.
+ remainingSpace = memoryPoolCapacity;
+ for (SSTableReader sstable : sstables)
+ {
+ if (isStopRequested())
+ throw new CompactionInterruptedException(getCompactionInfo());
+
+ int minIndexInterval = sstable.metadata.getMinIndexInterval();
+ int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
+
+ double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
+ long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
+
+ // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that
+ int currentNumEntries = sstable.getIndexSummarySize();
+ double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
+ long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
+ int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
+ int maxSummarySize = sstable.getMaxIndexSummarySize();
+
+ // if the min_index_interval changed, calculate what our current sampling level would be under the new min
+ if (sstable.getMinIndexInterval() != minIndexInterval)
+ {
+ int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval()));
+ maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval));
+ logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})",
+ sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel);
+ currentSamplingLevel = effectiveSamplingLevel;
+ }
+
+ int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries,
- minIndexInterval, maxIndexInterval);
++ minIndexInterval, maxIndexInterval);
+ int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize);
+ double effectiveIndexInterval = sstable.getEffectiveIndexInterval();
+
+ logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " +
- "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)",
- sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries,
- currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
- numEntriesAtNewSamplingLevel * avgEntrySize);
++ "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)",
++ sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries,
++ currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
++ numEntriesAtNewSamplingLevel * avgEntrySize);
+
+ if (effectiveIndexInterval < minIndexInterval)
+ {
+ // The min_index_interval was changed; re-sample to match it.
- logger.debug("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})",
- sstable, effectiveIndexInterval, minIndexInterval);
++ logger.trace("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})",
++ sstable, effectiveIndexInterval, minIndexInterval);
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= spaceUsed;
+ }
+ else if (effectiveIndexInterval > maxIndexInterval)
+ {
+ // The max_index_interval was lowered; force an upsample to the effective minimum sampling level
- logger.debug("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})",
- sstable, effectiveIndexInterval, maxIndexInterval);
++ logger.trace("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})",
++ sstable, effectiveIndexInterval, maxIndexInterval);
+ newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval);
+ numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize());
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+ }
- else if (targetNumEntries >= currentNumEntries * IndexSummaryManager.UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
++ else if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
+ {
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+ }
- else if (targetNumEntries < currentNumEntries * IndexSummaryManager.DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
++ else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
+ {
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= spaceUsed;
+ }
+ else
+ {
+ // keep the same sampling level
+ logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
+ remainingSpace -= sstable.getIndexSummaryOffHeapSize();
+ newSSTables.add(sstable);
++ transactions.get(sstable.metadata.cfId).cancel(sstable);
+ }
+ totalReadsPerSec -= readsPerSec;
+ }
+
+ if (remainingSpace > 0)
+ {
+ Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace);
+ toDownsample = result.right;
+ newSSTables.addAll(result.left);
++ for (SSTableReader sstable : result.left)
++ transactions.get(sstable.metadata.cfId).cancel(sstable);
+ }
+
+ // downsample first, then upsample
+ toDownsample.addAll(forceResample);
+ toDownsample.addAll(toUpsample);
+ toDownsample.addAll(forceUpsample);
- Multimap<DataTracker, SSTableReader> replacedByTracker = HashMultimap.create();
- Multimap<DataTracker, SSTableReader> replacementsByTracker = HashMultimap.create();
-
- try
++ for (ResampleEntry entry : toDownsample)
+ {
- for (ResampleEntry entry : toDownsample)
- {
- if (isStopRequested())
- throw new CompactionInterruptedException(getCompactionInfo());
-
- SSTableReader sstable = entry.sstable;
- logger.debug("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
- sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
- entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
- ColumnFamilyStore cfs = Keyspace.open(sstable.getKeyspaceName()).getColumnFamilyStore(sstable.getColumnFamilyName());
- DataTracker tracker = cfs.getDataTracker();
- SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
- newSSTables.add(replacement);
- replacedByTracker.put(tracker, sstable);
- replacementsByTracker.put(tracker, replacement);
- }
- }
- finally
- {
- for (DataTracker tracker : replacedByTracker.keySet())
- tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
++ if (isStopRequested())
++ throw new CompactionInterruptedException(getCompactionInfo());
++
++ SSTableReader sstable = entry.sstable;
++ logger.trace("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
++ sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
++ entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
++ ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
++ SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
++ newSSTables.add(replacement);
++ transactions.get(sstable.metadata.cfId).update(replacement, true);
+ }
+
+ return newSSTables;
+ }
+
+ @VisibleForTesting
+ static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
+ {
+ // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations
+ // that will make little difference.
+ Collections.sort(toDownsample, new Comparator<ResampleEntry>()
+ {
+ public int compare(ResampleEntry o1, ResampleEntry o2)
+ {
+ return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
+ o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
+ }
+ });
+
+ int noDownsampleCutoff = 0;
+ List<SSTableReader> willNotDownsample = new ArrayList<>();
+ while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
+ {
+ ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
+
+ long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
+ // see if we have enough leftover space to keep the current sampling level
+ if (extraSpaceRequired <= remainingSpace)
+ {
+ logger.trace("Using leftover space to keep {} at the current sampling level ({})",
+ entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
+ willNotDownsample.add(entry.sstable);
+ remainingSpace -= extraSpaceRequired;
+ }
+ else
+ {
+ break;
+ }
+
+ noDownsampleCutoff++;
+ }
+ return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
- return new CompactionInfo(OperationType.INDEX_SUMMARY, (remainingSpace - memoryPoolBytes), memoryPoolBytes, "bytes");
++ return new CompactionInfo(OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, "bytes", compactionId);
+ }
+
+ /** Utility class for sorting sstables by their read rates. */
+ private static class ReadRateComparator implements Comparator<SSTableReader>
+ {
+ private final Map<SSTableReader, Double> readRates;
+
+ ReadRateComparator(Map<SSTableReader, Double> readRates)
+ {
+ this.readRates = readRates;
+ }
+
+ @Override
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ Double readRate1 = readRates.get(o1);
+ Double readRate2 = readRates.get(o2);
+ if (readRate1 == null && readRate2 == null)
+ return 0;
+ else if (readRate1 == null)
+ return -1;
+ else if (readRate2 == null)
+ return 1;
+ else
+ return Double.compare(readRate1, readRate2);
+ }
+ }
+
+ private static class ResampleEntry
+ {
+ public final SSTableReader sstable;
+ public final long newSpaceUsed;
+ public final int newSamplingLevel;
+
+ ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
+ {
+ this.sstable = sstable;
+ this.newSpaceUsed = newSpaceUsed;
+ this.newSamplingLevel = newSamplingLevel;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1667494/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 5e46b8e,63928e2..6935680
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@@ -21,38 -21,33 +21,41 @@@ import java.io.IOException
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
--import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicReference;
- import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import com.google.common.base.Joiner;
+ import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
--import junit.framework.Assert;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.cache.CachingOptions;
+import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.compaction.CompactionInfo;
+ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.QueryFilter;
++import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.SimpleStrategy;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.metrics.RestorableMeter;
+import static com.google.common.collect.ImmutableMap.of;
+import static java.util.Arrays.asList;
import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
--import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD;
--import static org.apache.cassandra.io.sstable.IndexSummaryManager.UPSAMPLE_THRESHOLD;
++import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.DOWNSAMPLE_THESHOLD;
++import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.UPSAMPLE_THRESHOLD;
import static org.apache.cassandra.io.sstable.IndexSummaryManager.redistributeSummaries;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@@ -106,8 -78,13 +109,13 @@@ public class IndexSummaryManagerTes
@After
public void afterTest()
{
+ for (CompactionInfo.Holder holder: CompactionMetrics.getCompactions())
+ {
+ holder.stop();
+ }
+
- String ksname = "Keyspace1";
- String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
+ String ksname = KEYSPACE1;
+ String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no key caching
Keyspace keyspace = Keyspace.open(ksname);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
cfs.metadata.minIndexInterval(originalMinIndexInterval);
@@@ -581,4 -507,59 +589,65 @@@
assertTrue(entry.getValue() >= cfs.metadata.getMinIndexInterval());
}
}
+
+ @Test
+ public void testCancelIndex() throws Exception
+ {
- String ksname = "Keyspace1";
- String cfname = "StandardLowIndexInterval"; // index interval of 8, no key caching
++ String ksname = KEYSPACE1;
++ String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no key caching
+ Keyspace keyspace = Keyspace.open(ksname);
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
++ final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ final int numSSTables = 4;
++ final int numTries = 4;
+ int numRows = 256;
+ createSSTables(ksname, cfname, numSSTables, numRows);
+
+ final List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
+ for (SSTableReader sstable : sstables)
+ sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
+
+ final long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize();
+
+ // everything should get cut in half
+ final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>();
++
+ Thread t = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
- redistributeSummaries(Collections.<SSTableReader>emptyList(), sstables, (singleSummaryOffHeapSpace * (numSSTables / 2)));
++ // Don't leave enough space for even the minimal index summaries
++ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
++ {
++ redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), singleSummaryOffHeapSpace);
++ }
+ }
+ catch (CompactionInterruptedException ex)
+ {
+ exception.set(ex);
+ }
+ catch (IOException ignored)
+ {
+ }
+ }
+ });
+ t.start();
+ while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive())
- Thread.sleep(1);
++ Thread.yield();
+ CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+ t.join();
+
+ assertNotNull("Expected compaction interrupted exception", exception.get());
+ assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty());
+
+ Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables);
+ Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getSSTables());
+ Set<SSTableReader> disjoint = Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables);
+ assertTrue(String.format("Mismatched files before and after cancelling redistribution: %s",
+ Joiner.on(",").join(disjoint)),
+ disjoint.isEmpty());
+
+ validateData(cfs, numRows);
+ }
}
[3/4] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65885e7f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65885e7f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65885e7f
Branch: refs/heads/trunk
Commit: 65885e7fc356c342331aec11667b5abdc28897b6
Parents: b55523e f166749
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Dec 11 17:34:21 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Dec 11 17:37:41 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../cassandra/db/compaction/CompactionInfo.java | 14 +-
.../db/compaction/CompactionManager.java | 15 +
.../cassandra/db/compaction/OperationType.java | 3 +-
.../io/sstable/IndexSummaryManager.java | 279 +--------------
.../io/sstable/IndexSummaryRedistribution.java | 349 +++++++++++++++++++
.../io/sstable/IndexSummaryManagerTest.java | 80 ++++-
7 files changed, 462 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9c01160,5da0d42..5932dbb
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,34 -1,13 +1,37 @@@
-2.2.5
- * Fix regression in split size on CqlInputFormat (CASSANDRA-10835)
- * Better handling of SSL connection errors inter-node (CASSANDRA-10816)
- * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
- * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
++3.0.2
+ Merged from 2.1:
+ * Allow cancellation of index summary redistribution (CASSANDRA-8805)
- * Fix Stress profile parsing on Windows (CASSANDRA-10808)
-
-2.2.4
+3.0.1
+ * Avoid MV race during node decommission (CASSANDRA-10674)
+ * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
+ * Handle single-column deletions correction in materialized views
+ when the column is part of the view primary key (CASSANDRA-10796)
+ * Fix issue with datadir migration on upgrade (CASSANDRA-10788)
+ * Fix bug with range tombstones on reverse queries and test coverage for
+ AbstractBTreePartition (CASSANDRA-10059)
+ * Remove 64k limit on collection elements (CASSANDRA-10374)
+ * Remove unclear Indexer.indexes() method (CASSANDRA-10690)
+ * Fix NPE on stream read error (CASSANDRA-10771)
+ * Normalize cqlsh DESC output (CASSANDRA-10431)
+ * Rejects partition range deletions when columns are specified (CASSANDRA-10739)
+ * Fix error when saving cached key for old format sstable (CASSANDRA-10778)
+ * Invalidate prepared statements on DROP INDEX (CASSANDRA-10758)
+ * Fix SELECT statement with IN restrictions on partition key,
+ ORDER BY and LIMIT (CASSANDRA-10729)
+ * Improve stress performance over 1k threads (CASSANDRA-7217)
+ * Wait for migration responses to complete before bootstrapping (CASSANDRA-10731)
+ * Unable to create a function with argument of type Inet (CASSANDRA-10741)
+ * Fix backward incompatibiliy in CqlInputFormat (CASSANDRA-10717)
+ * Correctly preserve deletion info on updated rows when notifying indexers
+ of single-row deletions (CASSANDRA-10694)
+ * Notify indexers of partition delete during cleanup (CASSANDRA-10685)
+ * Keep the file open in trySkipCache (CASSANDRA-10669)
+ * Updated trigger example (CASSANDRA-10257)
+Merged from 2.2:
+ * Fix regression on split size in CqlInputFormat (CASSANDRA-10835)
+ * Better handling of SSL connection errors inter-node (CASSANDRA-10816)
+ * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
+ * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592)
* Show CQL help in cqlsh in web browser (CASSANDRA-7225)
* Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
* Reject index queries while the index is building (CASSANDRA-8505)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3ce7d2c,ba9c25e..bd950e3
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -50,11 -47,7 +50,12 @@@ import org.apache.cassandra.db.view.Vie
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.index.SecondaryIndexBuilder;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
++import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
+import org.apache.cassandra.io.sstable.SnapshotDeletingTask;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@@ -1322,7 -1345,21 +1323,21 @@@ public class CompactionManager implemen
return executor.submit(runnable);
}
+ public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws IOException
+ {
+ metrics.beginCompaction(redistribution);
+
+ try
+ {
+ return redistribution.redistributeSummaries();
+ }
+ finally
+ {
+ metrics.finishCompaction(redistribution);
+ }
+ }
+
- public static int getDefaultGcBefore(ColumnFamilyStore cfs)
+ public static int getDefaultGcBefore(ColumnFamilyStore cfs, int nowInSec)
{
// 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to
// add any GcGrace however since 2ndary indexes are local to a node.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/OperationType.java
index a69622b,6b66ded..20e6df2
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@@ -33,13 -33,9 +33,14 @@@ public enum OperationTyp
UNKNOWN("Unknown compaction type"),
ANTICOMPACTION("Anticompaction after repair"),
VERIFY("Verify"),
+ FLUSH("Flush"),
+ STREAM("Stream"),
+ WRITE("Write"),
- VIEW_BUILD("View build");
++ VIEW_BUILD("View build"),
+ INDEX_SUMMARY("Index summary redistribution");
- private final String type;
+ public final String type;
+ public final String fileName;
OperationType(String type)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index e07f297,4438dc1..aed35c9
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@@ -26,10 -26,8 +26,7 @@@ import javax.management.MBeanServer
import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;
-
- import org.apache.cassandra.db.lifecycle.SSTableSet;
- import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -40,6 -38,8 +37,9 @@@ import org.apache.cassandra.db.compacti
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.View;
++import org.apache.cassandra.db.lifecycle.SSTableSet;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 0000000,aad479b..b4eae31
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@@ -1,0 -1,349 +1,349 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.io.sstable;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.UUID;
+
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.Iterables;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.compaction.CompactionInfo;
+ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+ import org.apache.cassandra.db.compaction.OperationType;
+ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.utils.Pair;
+
+ import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+
+ public class IndexSummaryRedistribution extends CompactionInfo.Holder
+ {
+ private static final Logger logger = LoggerFactory.getLogger(IndexSummaryRedistribution.class);
+
+ // The target (or ideal) number of index summary entries must differ from the actual number of
+ // entries by this ratio in order to trigger an upsample or downsample of the summary. Because
+ // upsampling requires reading the primary index in order to rebuild the summary, the threshold
+ // for upsampling is is higher.
+ static final double UPSAMPLE_THRESHOLD = 1.5;
+ static final double DOWNSAMPLE_THESHOLD = 0.75;
+
+ private final List<SSTableReader> compacting;
+ private final Map<UUID, LifecycleTransaction> transactions;
+ private final long memoryPoolBytes;
+ private final UUID compactionId;
+ private volatile long remainingSpace;
+
+ public IndexSummaryRedistribution(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes)
+ {
+ this.compacting = compacting;
+ this.transactions = transactions;
+ this.memoryPoolBytes = memoryPoolBytes;
+ this.compactionId = UUID.randomUUID();
+ }
+
+ public List<SSTableReader> redistributeSummaries() throws IOException
+ {
+ logger.info("Redistributing index summaries");
+ List<SSTableReader> oldFormatSSTables = new ArrayList<>();
+ List<SSTableReader> redistribute = new ArrayList<>();
+ for (LifecycleTransaction txn : transactions.values())
+ {
+ for (SSTableReader sstable : ImmutableList.copyOf(txn.originals()))
+ {
+ // We can't change the sampling level of sstables with the old format, because the serialization format
+ // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.)
+ logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
+ if (!sstable.descriptor.version.hasSamplingLevel())
+ {
+ oldFormatSSTables.add(sstable);
+ txn.cancel(sstable);
+ }
+ }
+ redistribute.addAll(txn.originals());
+ }
+
+ long total = 0;
+ for (SSTableReader sstable : Iterables.concat(compacting, redistribute))
+ total += sstable.getIndexSummaryOffHeapSize();
+
+ logger.trace("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB",
+ redistribute.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0);
+
+ final Map<SSTableReader, Double> readRates = new HashMap<>(redistribute.size());
+ double totalReadsPerSec = 0.0;
+ for (SSTableReader sstable : redistribute)
+ {
+ if (isStopRequested())
+ throw new CompactionInterruptedException(getCompactionInfo());
+
+ if (sstable.getReadMeter() != null)
+ {
+ Double readRate = sstable.getReadMeter().fifteenMinuteRate();
+ totalReadsPerSec += readRate;
+ readRates.put(sstable, readRate);
+ }
+ }
+ logger.trace("Total reads/sec across all sstables in index summary resize process: {}", totalReadsPerSec);
+
+ // copy and sort by read rates (ascending)
+ List<SSTableReader> sstablesByHotness = new ArrayList<>(redistribute);
+ Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
+
+ long remainingBytes = memoryPoolBytes;
+ for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
+ remainingBytes -= sstable.getIndexSummaryOffHeapSize();
+
+ logger.trace("Index summaries for compacting SSTables are using {} MB of space",
+ (memoryPoolBytes - remainingBytes) / 1024.0 / 1024.0);
+ List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, transactions, totalReadsPerSec, remainingBytes);
+
+ for (LifecycleTransaction txn : transactions.values())
+ txn.finish();
+
+ total = 0;
+ for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
+ total += sstable.getIndexSummaryOffHeapSize();
+ logger.trace("Completed resizing of index summaries; current approximate memory used: {} MB",
+ total / 1024.0 / 1024.0);
+
+ return newSSTables;
+ }
+
+ private List<SSTableReader> adjustSamplingLevels(List<SSTableReader> sstables,
+ Map<UUID, LifecycleTransaction> transactions,
+ double totalReadsPerSec, long memoryPoolCapacity) throws IOException
+ {
+ List<ResampleEntry> toDownsample = new ArrayList<>(sstables.size() / 4);
+ List<ResampleEntry> toUpsample = new ArrayList<>(sstables.size() / 4);
+ List<ResampleEntry> forceResample = new ArrayList<>();
+ List<ResampleEntry> forceUpsample = new ArrayList<>();
+ List<SSTableReader> newSSTables = new ArrayList<>(sstables.size());
+
+ // Going from the coldest to the hottest sstables, try to give each sstable an amount of space proportional
+ // to the number of total reads/sec it handles.
+ remainingSpace = memoryPoolCapacity;
+ for (SSTableReader sstable : sstables)
+ {
+ if (isStopRequested())
+ throw new CompactionInterruptedException(getCompactionInfo());
+
- int minIndexInterval = sstable.metadata.getMinIndexInterval();
- int maxIndexInterval = sstable.metadata.getMaxIndexInterval();
++ int minIndexInterval = sstable.metadata.params.minIndexInterval;
++ int maxIndexInterval = sstable.metadata.params.maxIndexInterval;
+
+ double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate();
+ long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec));
+
+ // figure out how many entries our idealSpace would buy us, and pick a new sampling level based on that
+ int currentNumEntries = sstable.getIndexSummarySize();
+ double avgEntrySize = sstable.getIndexSummaryOffHeapSize() / (double) currentNumEntries;
+ long targetNumEntries = Math.max(1, Math.round(idealSpace / avgEntrySize));
+ int currentSamplingLevel = sstable.getIndexSummarySamplingLevel();
+ int maxSummarySize = sstable.getMaxIndexSummarySize();
+
+ // if the min_index_interval changed, calculate what our current sampling level would be under the new min
+ if (sstable.getMinIndexInterval() != minIndexInterval)
+ {
+ int effectiveSamplingLevel = (int) Math.round(currentSamplingLevel * (minIndexInterval / (double) sstable.getMinIndexInterval()));
+ maxSummarySize = (int) Math.round(maxSummarySize * (sstable.getMinIndexInterval() / (double) minIndexInterval));
+ logger.trace("min_index_interval changed from {} to {}, so the current sampling level for {} is effectively now {} (was {})",
+ sstable.getMinIndexInterval(), minIndexInterval, sstable, effectiveSamplingLevel, currentSamplingLevel);
+ currentSamplingLevel = effectiveSamplingLevel;
+ }
+
+ int newSamplingLevel = IndexSummaryBuilder.calculateSamplingLevel(currentSamplingLevel, currentNumEntries, targetNumEntries,
+ minIndexInterval, maxIndexInterval);
+ int numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, maxSummarySize);
+ double effectiveIndexInterval = sstable.getEffectiveIndexInterval();
+
+ logger.trace("{} has {} reads/sec; ideal space for index summary: {} bytes ({} entries); considering moving " +
+ "from level {} ({} entries, {} bytes) to level {} ({} entries, {} bytes)",
+ sstable.getFilename(), readsPerSec, idealSpace, targetNumEntries, currentSamplingLevel, currentNumEntries,
+ currentNumEntries * avgEntrySize, newSamplingLevel, numEntriesAtNewSamplingLevel,
+ numEntriesAtNewSamplingLevel * avgEntrySize);
+
+ if (effectiveIndexInterval < minIndexInterval)
+ {
+ // The min_index_interval was changed; re-sample to match it.
+ logger.trace("Forcing resample of {} because the current index interval ({}) is below min_index_interval ({})",
+ sstable, effectiveIndexInterval, minIndexInterval);
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ forceResample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= spaceUsed;
+ }
+ else if (effectiveIndexInterval > maxIndexInterval)
+ {
+ // The max_index_interval was lowered; force an upsample to the effective minimum sampling level
+ logger.trace("Forcing upsample of {} because the current index interval ({}) is above max_index_interval ({})",
+ sstable, effectiveIndexInterval, maxIndexInterval);
+ newSamplingLevel = Math.max(1, (BASE_SAMPLING_LEVEL * minIndexInterval) / maxIndexInterval);
+ numEntriesAtNewSamplingLevel = IndexSummaryBuilder.entriesAtSamplingLevel(newSamplingLevel, sstable.getMaxIndexSummarySize());
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ forceUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+ }
+ else if (targetNumEntries >= currentNumEntries * UPSAMPLE_THRESHOLD && newSamplingLevel > currentSamplingLevel)
+ {
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ toUpsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= avgEntrySize * numEntriesAtNewSamplingLevel;
+ }
+ else if (targetNumEntries < currentNumEntries * DOWNSAMPLE_THESHOLD && newSamplingLevel < currentSamplingLevel)
+ {
+ long spaceUsed = (long) Math.ceil(avgEntrySize * numEntriesAtNewSamplingLevel);
+ toDownsample.add(new ResampleEntry(sstable, spaceUsed, newSamplingLevel));
+ remainingSpace -= spaceUsed;
+ }
+ else
+ {
+ // keep the same sampling level
+ logger.trace("SSTable {} is within thresholds of ideal sampling", sstable);
+ remainingSpace -= sstable.getIndexSummaryOffHeapSize();
+ newSSTables.add(sstable);
+ transactions.get(sstable.metadata.cfId).cancel(sstable);
+ }
+ totalReadsPerSec -= readsPerSec;
+ }
+
+ if (remainingSpace > 0)
+ {
+ Pair<List<SSTableReader>, List<ResampleEntry>> result = distributeRemainingSpace(toDownsample, remainingSpace);
+ toDownsample = result.right;
+ newSSTables.addAll(result.left);
+ for (SSTableReader sstable : result.left)
+ transactions.get(sstable.metadata.cfId).cancel(sstable);
+ }
+
+ // downsample first, then upsample
+ toDownsample.addAll(forceResample);
+ toDownsample.addAll(toUpsample);
+ toDownsample.addAll(forceUpsample);
+ for (ResampleEntry entry : toDownsample)
+ {
+ if (isStopRequested())
+ throw new CompactionInterruptedException(getCompactionInfo());
+
+ SSTableReader sstable = entry.sstable;
+ logger.trace("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
+ sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
+ entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
+ ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
+ SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
+ newSSTables.add(replacement);
+ transactions.get(sstable.metadata.cfId).update(replacement, true);
+ }
+
+ return newSSTables;
+ }
+
+ @VisibleForTesting
+ static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
+ {
+ // sort by the amount of space regained by doing the downsample operation; we want to try to avoid operations
+ // that will make little difference.
+ Collections.sort(toDownsample, new Comparator<ResampleEntry>()
+ {
+ public int compare(ResampleEntry o1, ResampleEntry o2)
+ {
+ return Double.compare(o1.sstable.getIndexSummaryOffHeapSize() - o1.newSpaceUsed,
+ o2.sstable.getIndexSummaryOffHeapSize() - o2.newSpaceUsed);
+ }
+ });
+
+ int noDownsampleCutoff = 0;
+ List<SSTableReader> willNotDownsample = new ArrayList<>();
+ while (remainingSpace > 0 && noDownsampleCutoff < toDownsample.size())
+ {
+ ResampleEntry entry = toDownsample.get(noDownsampleCutoff);
+
+ long extraSpaceRequired = entry.sstable.getIndexSummaryOffHeapSize() - entry.newSpaceUsed;
+ // see if we have enough leftover space to keep the current sampling level
+ if (extraSpaceRequired <= remainingSpace)
+ {
+ logger.trace("Using leftover space to keep {} at the current sampling level ({})",
+ entry.sstable, entry.sstable.getIndexSummarySamplingLevel());
+ willNotDownsample.add(entry.sstable);
+ remainingSpace -= extraSpaceRequired;
+ }
+ else
+ {
+ break;
+ }
+
+ noDownsampleCutoff++;
+ }
+ return Pair.create(willNotDownsample, toDownsample.subList(noDownsampleCutoff, toDownsample.size()));
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
+ return new CompactionInfo(OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, "bytes", compactionId);
+ }
+
+ /** Utility class for sorting sstables by their read rates. */
+ private static class ReadRateComparator implements Comparator<SSTableReader>
+ {
+ private final Map<SSTableReader, Double> readRates;
+
+ ReadRateComparator(Map<SSTableReader, Double> readRates)
+ {
+ this.readRates = readRates;
+ }
+
+ @Override
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ Double readRate1 = readRates.get(o1);
+ Double readRate2 = readRates.get(o2);
+ if (readRate1 == null && readRate2 == null)
+ return 0;
+ else if (readRate1 == null)
+ return -1;
+ else if (readRate2 == null)
+ return 1;
+ else
+ return Double.compare(readRate1, readRate2);
+ }
+ }
+
+ private static class ResampleEntry
+ {
+ public final SSTableReader sstable;
+ public final long newSpaceUsed;
+ public final int newSamplingLevel;
+
+ ResampleEntry(SSTableReader sstable, long newSpaceUsed, int newSamplingLevel)
+ {
+ this.sstable = sstable;
+ this.newSpaceUsed = newSpaceUsed;
+ this.newSamplingLevel = newSamplingLevel;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65885e7f/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 5493edb,6935680..0498c68
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@@ -19,15 -19,12 +19,20 @@@ package org.apache.cassandra.io.sstable
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
++import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
++import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicReference;
+ import com.google.common.base.Joiner;
+ import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@@ -39,18 -36,20 +44,22 @@@ import org.slf4j.LoggerFactory
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
-import org.apache.cassandra.cache.CachingOptions;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.compaction.CompactionInfo;
+ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.filter.QueryFilter;
+ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.locator.SimpleStrategy;
+ import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
import static com.google.common.collect.ImmutableMap.of;
import static java.util.Arrays.asList;
@@@ -587,7 -586,68 +601,67 @@@ public class IndexSummaryManagerTes
for (Map.Entry<String, Integer> entry : intervals.entrySet())
{
if (entry.getKey().contains(CF_STANDARDLOWiINTERVAL))
- assertTrue(entry.getValue() >= cfs.metadata.getMinIndexInterval());
+ assertTrue(entry.getValue() >= cfs.metadata.params.minIndexInterval);
}
}
+
+ @Test
+ public void testCancelIndex() throws Exception
+ {
+ String ksname = KEYSPACE1;
+ String cfname = CF_STANDARDLOWiINTERVAL; // index interval of 8, no key caching
+ Keyspace keyspace = Keyspace.open(ksname);
+ final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ final int numSSTables = 4;
- final int numTries = 4;
+ int numRows = 256;
+ createSSTables(ksname, cfname, numSSTables, numRows);
+
- final List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
++ final List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+ for (SSTableReader sstable : sstables)
+ sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
+
+ final long singleSummaryOffHeapSpace = sstables.get(0).getIndexSummaryOffHeapSize();
+
+ // everything should get cut in half
+ final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>();
+
+ Thread t = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // Don't leave enough space for even the minimal index summaries
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), singleSummaryOffHeapSpace);
+ }
+ }
+ catch (CompactionInterruptedException ex)
+ {
+ exception.set(ex);
+ }
+ catch (IOException ignored)
+ {
+ }
+ }
+ });
+ t.start();
+ while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive())
- Thread.yield();
++ Thread.sleep(1);
+ CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+ t.join();
+
+ assertNotNull("Expected compaction interrupted exception", exception.get());
+ assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty());
+
+ Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables);
- Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getSSTables());
++ Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getLiveSSTables());
+ Set<SSTableReader> disjoint = Sets.symmetricDifference(beforeRedistributionSSTables, afterCancelSSTables);
+ assertTrue(String.format("Mismatched files before and after cancelling redistribution: %s",
+ Joiner.on(",").join(disjoint)),
+ disjoint.isEmpty());
+
+ validateData(cfs, numRows);
+ }
}
[4/4] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3f79c5ba
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3f79c5ba
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3f79c5ba
Branch: refs/heads/trunk
Commit: 3f79c5baae355924866b4997b18960b6590f5763
Parents: 2cd18ef 65885e7
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Dec 11 17:43:02 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Dec 11 17:43:02 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../cassandra/db/compaction/CompactionInfo.java | 14 +-
.../db/compaction/CompactionManager.java | 15 +
.../cassandra/db/compaction/OperationType.java | 3 +-
.../io/sstable/IndexSummaryManager.java | 279 +--------------
.../io/sstable/IndexSummaryRedistribution.java | 349 +++++++++++++++++++
.../io/sstable/IndexSummaryManagerTest.java | 80 ++++-
7 files changed, 461 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f79c5ba/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 296d3cd,5932dbb..fad4bb2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,23 -1,7 +1,25 @@@
-3.0.2
+3.2
+ * Add compressor name in sstablemetadata output (CASSANDRA-9879)
+ * Fix type casting for counter columns (CASSANDRA-10824)
+ * Prevent running Cassandra as root (CASSANDRA-8142)
+ * bound maximum in-flight commit log replay mutation bytes to 64 megabytes (CASSANDRA-8639)
+ * Normalize all scripts (CASSANDRA-10679)
+ * Make compression ratio much more accurate (CASSANDRA-10225)
+ * Optimize building of Clustering object when only one is created (CASSANDRA-10409)
+ * Make index building pluggable (CASSANDRA-10681)
+ * Add sstable flush observer (CASSANDRA-10678)
+ * Improve NTS endpoints calculation (CASSANDRA-10200)
+ * Improve performance of the folderSize function (CASSANDRA-10677)
+ * Add support for type casting in selection clause (CASSANDRA-10310)
+ * Added graphing option to cassandra-stress (CASSANDRA-7918)
+ * Abort in-progress queries that time out (CASSANDRA-7392)
+ * Add transparent data encryption core classes (CASSANDRA-9945)
+ Merged from 2.1:
+ * Allow cancellation of index summary redistribution (CASSANDRA-8805)
-3.0.1
+
+
+3.1
+Merged from 3.0:
* Avoid MV race during node decommission (CASSANDRA-10674)
* Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
* Handle single-column deletions correction in materialized views
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f79c5ba/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------