You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/03/23 15:02:25 UTC
[3/6] cassandra git commit: Allocate merkletrees with the correct size
Allocate merkletrees with the correct size
Patch by marcuse; reviewed by Marcus Olsson for CASSANDRA-11390
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d1bfae5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d1bfae5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d1bfae5
Branch: refs/heads/trunk
Commit: 1d1bfae580d44d3b8a4678c5af5767ff17102128
Parents: d479b8d
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Mar 21 15:41:32 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 23 14:54:39 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 49 ++++++++++++++------
2 files changed, 35 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d1bfae5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 904206b..cf36047 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.5
+ * Allocate merkletrees with the correct size (CASSANDRA-11390)
* Support streaming pre-3.0 sstables (CASSANDRA-10990)
* Add backpressure to compressed commit log (CASSANDRA-10971)
* SSTableExport supports secondary index tables (CASSANDRA-11330)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d1bfae5/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 891f976..7c46fcb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1078,16 +1078,8 @@ public class CompactionManager implements CompactionManagerMBean
// Create Merkle trees suitable to hold estimated partitions for the given ranges.
// We blindly assume that a partition is evenly distributed on all sstables for now.
- long numPartitions = 0;
- for (SSTableReader sstable : sstables)
- {
- numPartitions += sstable.estimatedKeysForRanges(validator.desc.ranges);
- }
// determine tree depth from number of partitions, but cap at 20 to prevent large tree.
- int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
- MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
- tree.addMerkleTrees((int) Math.pow(2, depth), validator.desc.ranges);
-
+ MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs);
long start = System.nanoTime();
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore);
@@ -1114,15 +1106,11 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- if (logger.isTraceEnabled())
+ if (logger.isDebugEnabled())
{
- // MT serialize may take time
long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- logger.trace("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
+ logger.debug("Validation finished in {} msec, for {}",
duration,
- depth,
- numPartitions,
- MerkleTrees.serializer.serializedSize(tree, 0),
validator.desc);
}
}
@@ -1133,6 +1121,37 @@ public class CompactionManager implements CompactionManagerMBean
}
}
+ private static MerkleTrees createMerkleTrees(Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, ColumnFamilyStore cfs)
+ {
+ MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
+ long allPartitions = 0;
+ Map<Range<Token>, Long> rangePartitionCounts = new HashMap<>();
+ for (Range<Token> range : ranges)
+ {
+ long numPartitions = 0;
+ for (SSTableReader sstable : sstables)
+ numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(range));
+ rangePartitionCounts.put(range, numPartitions);
+ allPartitions += numPartitions;
+ }
+
+ for (Range<Token> range : ranges)
+ {
+ long numPartitions = rangePartitionCounts.get(range);
+ double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0;
+ int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0;
+ int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), maxDepth) : 0;
+ tree.addMerkleTree((int) Math.pow(2, depth), range);
+ }
+ if (logger.isDebugEnabled())
+ {
+ // MT serialize may take time
+ logger.debug("Created {} merkle trees with merkle trees size {}, {} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, MerkleTrees.serializer.serializedSize(tree, 0));
+ }
+
+ return tree;
+ }
+
private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
{
Refs<SSTableReader> sstables;