You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/06/27 19:27:53 UTC

[2/3] git commit: Adjust MT depth based on # of partitions validating

Adjust MT depth based on # of partitions validating

patch by yukim; reviewed by jbellis for CASSANDRA-5263


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ef4a07b4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ef4a07b4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ef4a07b4

Branch: refs/heads/trunk
Commit: ef4a07b4b62eb448b6c1752250896fc861ff29a4
Parents: e7dbdd8
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Jun 27 12:13:02 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Jun 27 12:13:02 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 28 ++++++++++++++++-
 .../org/apache/cassandra/repair/Validator.java  | 32 +++++++-------------
 .../apache/cassandra/repair/ValidatorTest.java  | 10 +++---
 .../cassandra/service/SerializationsTest.java   | 11 ++++---
 5 files changed, 51 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef4a07b4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7de31c5..b30e9a2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
 2.1.1
  * Improve schema merge performance (CASSANDRA-7444)
  * Fix NPE when unknown prepared statement ID is used (CASSANDRA-7454)
+ * Adjust MT depth based on # of partition validating (CASSANDRA-5263)
 
 
 2.1.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef4a07b4/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 227f908..fed7ec7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -87,6 +87,7 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
@@ -892,13 +893,26 @@ public class CompactionManager implements CompactionManagerMBean
                 gcBefore = getDefaultGcBefore(cfs);
         }
 
+        // Create Merkle tree suitable to hold estimated partitions for given range.
+        // We blindly assume that partition is evenly distributed on all sstables for now.
+        long numPartitions = 0;
+        for (SSTableReader sstable : sstables)
+        {
+            numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
+        }
+        // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
+        int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+        MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+
         CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.desc.range, gcBefore);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
+
+        long start = System.nanoTime();
         metrics.beginCompaction(ci);
         try
         {
             // validate the CF as we iterate over it
-            validator.prepare(cfs);
+            validator.prepare(cfs, tree);
             while (iter.hasNext())
             {
                 if (ci.isStopRequested())
@@ -919,6 +933,18 @@ public class CompactionManager implements CompactionManagerMBean
 
             metrics.finishCompaction(ci);
         }
+
+        if (logger.isDebugEnabled())
+        {
+            // MT serialize may take time
+            long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+            logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
+                         duration,
+                         depth,
+                         numPartitions,
+                         MerkleTree.serializer.serializedSize(tree, 0),
+                         validator.desc);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef4a07b4/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index d93b4a5..641717e 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
@@ -52,41 +51,32 @@ public class Validator implements Runnable
 
     public final RepairJobDesc desc;
     public final InetAddress initiator;
-    public final MerkleTree tree;
     public final int gcBefore;
 
     // null when all rows with the min token have been consumed
-    private transient long validated;
-    private transient MerkleTree.TreeRange range;
-    private transient MerkleTree.TreeRangeIterator ranges;
-    private transient DecoratedKey lastKey;
+    private long validated;
+    private MerkleTree tree;
+    // current range being updated
+    private MerkleTree.TreeRange range;
+    // iterator for iterating sub ranges (MT's leaves)
+    private MerkleTree.TreeRangeIterator ranges;
+    // last key seen
+    private DecoratedKey lastKey;
 
-    /**
-     * Create Validator with default size of initial Merkle Tree.
-     */
     public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
     {
-        this(desc,
-             initiator,
-             // TODO: memory usage (maxsize) should either be tunable per
-             // CF, globally, or as shared for all CFs in a cluster
-             new MerkleTree(DatabaseDescriptor.getPartitioner(), desc.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)),
-             gcBefore);
-    }
-
-    public Validator(RepairJobDesc desc, InetAddress initiator, MerkleTree tree, int gcBefore)
-    {
         this.desc = desc;
         this.initiator = initiator;
-        this.tree = tree;
         this.gcBefore = gcBefore;
         validated = 0;
         range = null;
         ranges = null;
     }
 
-    public void prepare(ColumnFamilyStore cfs)
+    public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
     {
+        this.tree = tree;
+
         if (!tree.partitioner().preservesOrder())
         {
             // You can't beat an even tree distribution for md5

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef4a07b4/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index c3ce810..4d65cdb 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.ValidationComplete;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 import static org.junit.Assert.*;
@@ -103,10 +104,11 @@ public class ValidatorTest extends SchemaLoader
         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
 
         Validator validator = new Validator(desc, remote, 0);
-        validator.prepare(cfs);
+        MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, 15));
+        validator.prepare(cfs, tree);
 
         // and confirm that the tree was split
-        assertTrue(validator.tree.size() > 1);
+        assertTrue(tree.size() > 1);
 
         // add a row
         Token mid = partitioner.midpoint(range.left, range.right);
@@ -114,8 +116,8 @@ public class ValidatorTest extends SchemaLoader
         validator.complete();
 
         // confirm that the tree was validated
-        Token min = validator.tree.partitioner().getMinimumToken();
-        assertNotNull(validator.tree.hash(new Range<>(min, min)));
+        Token min = tree.partitioner().getMinimumToken();
+        assertNotNull(tree.hash(new Range<>(min, min)));
 
         if (!lock.isSignaled())
             lock.await();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef4a07b4/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 6937ceb..49632f9 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -93,17 +93,18 @@ public class SerializationsTest extends AbstractSerializationsTester
 
     private void testValidationCompleteWrite() throws IOException
     {
+        IPartitioner p = new RandomPartitioner();
         // empty validation
+        MerkleTree mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, 15));
         Validator v0 = new Validator(DESC, FBUtilities.getBroadcastAddress(),  -1);
-        ValidationComplete c0 = new ValidationComplete(DESC, v0.tree);
+        ValidationComplete c0 = new ValidationComplete(DESC, mt);
 
         // validation with a tree
-        IPartitioner p = new RandomPartitioner();
-        MerkleTree mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, Integer.MAX_VALUE);
+        mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, Integer.MAX_VALUE);
         for (int i = 0; i < 10; i++)
             mt.split(p.getRandomToken());
-        Validator v1 = new Validator(DESC, FBUtilities.getBroadcastAddress(), mt, -1);
-        ValidationComplete c1 = new ValidationComplete(DESC, v1.tree);
+        Validator v1 = new Validator(DESC, FBUtilities.getBroadcastAddress(), -1);
+        ValidationComplete c1 = new ValidationComplete(DESC, mt);
 
         // validation failed
         ValidationComplete c3 = new ValidationComplete(DESC);