You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/06/27 19:27:53 UTC
[2/3] git commit: Adjust MT depth based on # of partitions validating
Adjust MT depth based on # of partitions validating
patch by yukim; reviewed by jbellis for CASSANDRA-5263
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ef4a07b4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ef4a07b4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ef4a07b4
Branch: refs/heads/trunk
Commit: ef4a07b4b62eb448b6c1752250896fc861ff29a4
Parents: e7dbdd8
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Jun 27 12:13:02 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Jun 27 12:13:02 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 28 ++++++++++++++++-
.../org/apache/cassandra/repair/Validator.java | 32 +++++++-------------
.../apache/cassandra/repair/ValidatorTest.java | 10 +++---
.../cassandra/service/SerializationsTest.java | 11 ++++---
5 files changed, 51 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef4a07b4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7de31c5..b30e9a2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
2.1.1
* Improve schema merge performance (CASSANDRA-7444)
* Fix NPE when unknown prepared statement ID is used (CASSANDRA-7454)
+ * Adjust MT depth based on # of partition validating (CASSANDRA-5263)
2.1.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef4a07b4/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 227f908..fed7ec7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -87,6 +87,7 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -892,13 +893,26 @@ public class CompactionManager implements CompactionManagerMBean
gcBefore = getDefaultGcBefore(cfs);
}
+ // Create Merkle tree suitable to hold estimated partitions for given range.
+ // We blindly assume that partition is evenly distributed on all sstables for now.
+ long numPartitions = 0;
+ for (SSTableReader sstable : sstables)
+ {
+ numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
+ }
+ // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
+ int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+ MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+
CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.desc.range, gcBefore);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
+
+ long start = System.nanoTime();
metrics.beginCompaction(ci);
try
{
// validate the CF as we iterate over it
- validator.prepare(cfs);
+ validator.prepare(cfs, tree);
while (iter.hasNext())
{
if (ci.isStopRequested())
@@ -919,6 +933,18 @@ public class CompactionManager implements CompactionManagerMBean
metrics.finishCompaction(ci);
}
+
+ if (logger.isDebugEnabled())
+ {
+ // MT serialize may take time
+ long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
+ duration,
+ depth,
+ numPartitions,
+ MerkleTree.serializer.serializedSize(tree, 0),
+ validator.desc);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef4a07b4/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index d93b4a5..641717e 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
@@ -52,41 +51,32 @@ public class Validator implements Runnable
public final RepairJobDesc desc;
public final InetAddress initiator;
- public final MerkleTree tree;
public final int gcBefore;
// null when all rows with the min token have been consumed
- private transient long validated;
- private transient MerkleTree.TreeRange range;
- private transient MerkleTree.TreeRangeIterator ranges;
- private transient DecoratedKey lastKey;
+ private long validated;
+ private MerkleTree tree;
+ // current range being updated
+ private MerkleTree.TreeRange range;
+ // iterator for iterating sub ranges (MT's leaves)
+ private MerkleTree.TreeRangeIterator ranges;
+ // last key seen
+ private DecoratedKey lastKey;
- /**
- * Create Validator with default size of initial Merkle Tree.
- */
public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
{
- this(desc,
- initiator,
- // TODO: memory usage (maxsize) should either be tunable per
- // CF, globally, or as shared for all CFs in a cluster
- new MerkleTree(DatabaseDescriptor.getPartitioner(), desc.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)),
- gcBefore);
- }
-
- public Validator(RepairJobDesc desc, InetAddress initiator, MerkleTree tree, int gcBefore)
- {
this.desc = desc;
this.initiator = initiator;
- this.tree = tree;
this.gcBefore = gcBefore;
validated = 0;
range = null;
ranges = null;
}
- public void prepare(ColumnFamilyStore cfs)
+ public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
{
+ this.tree = tree;
+
if (!tree.partitioner().preservesOrder())
{
// You can't beat an even tree distribution for md5
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef4a07b4/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index c3ce810..4d65cdb 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.ValidationComplete;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import static org.junit.Assert.*;
@@ -103,10 +104,11 @@ public class ValidatorTest extends SchemaLoader
ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
Validator validator = new Validator(desc, remote, 0);
- validator.prepare(cfs);
+ MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, 15));
+ validator.prepare(cfs, tree);
// and confirm that the tree was split
- assertTrue(validator.tree.size() > 1);
+ assertTrue(tree.size() > 1);
// add a row
Token mid = partitioner.midpoint(range.left, range.right);
@@ -114,8 +116,8 @@ public class ValidatorTest extends SchemaLoader
validator.complete();
// confirm that the tree was validated
- Token min = validator.tree.partitioner().getMinimumToken();
- assertNotNull(validator.tree.hash(new Range<>(min, min)));
+ Token min = tree.partitioner().getMinimumToken();
+ assertNotNull(tree.hash(new Range<>(min, min)));
if (!lock.isSignaled())
lock.await();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef4a07b4/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 6937ceb..49632f9 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -93,17 +93,18 @@ public class SerializationsTest extends AbstractSerializationsTester
private void testValidationCompleteWrite() throws IOException
{
+ IPartitioner p = new RandomPartitioner();
// empty validation
+ MerkleTree mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, 15));
Validator v0 = new Validator(DESC, FBUtilities.getBroadcastAddress(), -1);
- ValidationComplete c0 = new ValidationComplete(DESC, v0.tree);
+ ValidationComplete c0 = new ValidationComplete(DESC, mt);
// validation with a tree
- IPartitioner p = new RandomPartitioner();
- MerkleTree mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, Integer.MAX_VALUE);
+ mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, Integer.MAX_VALUE);
for (int i = 0; i < 10; i++)
mt.split(p.getRandomToken());
- Validator v1 = new Validator(DESC, FBUtilities.getBroadcastAddress(), mt, -1);
- ValidationComplete c1 = new ValidationComplete(DESC, v1.tree);
+ Validator v1 = new Validator(DESC, FBUtilities.getBroadcastAddress(), -1);
+ ValidationComplete c1 = new ValidationComplete(DESC, mt);
// validation failed
ValidationComplete c3 = new ValidationComplete(DESC);