You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/09/29 20:06:40 UTC
[03/10] cassandra git commit: Fix merkle tree depth calculation
Fix merkle tree depth calculation
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12580
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c70ce630
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c70ce630
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c70ce630
Branch: refs/heads/cassandra-3.X
Commit: c70ce6307da824529762ff40673642b6f86972aa
Parents: 2383935
Author: Paulo Motta <pa...@gmail.com>
Authored: Tue Aug 30 21:06:39 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:26:53 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../db/compaction/CompactionManager.java | 4 +-
.../org/apache/cassandra/repair/Validator.java | 9 +-
.../org/apache/cassandra/utils/MerkleTree.java | 10 ++
.../db/compaction/CompactionsTest.java | 2 +-
.../apache/cassandra/repair/ValidatorTest.java | 159 ++++++++++++-------
6 files changed, 125 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 998849e..97bc70a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Fix merkle tree depth calculation (CASSANDRA-12580)
* Make Collections deserialization more robust (CASSANDRA-12618)
@@ -36,7 +37,6 @@
* Don't write shadowed range tombstone (CASSANDRA-12030)
Merged from 2.1:
* Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
- * Fix queries with empty ByteBuffer values in clustering column restrictions (CASSANDRA-12127)
* Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828)
* Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
* cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index cf82498..78fa23c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1112,8 +1112,8 @@ public class CompactionManager implements CompactionManagerMBean
{
numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
}
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
- int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+ // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
+ int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
long start = System.nanoTime();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 4db1cfb..8dbb4cf 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -54,6 +54,7 @@ public class Validator implements Runnable
public final RepairJobDesc desc;
public final InetAddress initiator;
public final int gcBefore;
+ private final boolean evenTreeDistribution;
// null when all rows with the min token have been consumed
private long validated;
@@ -67,19 +68,25 @@ public class Validator implements Runnable
public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
{
+ this(desc, initiator, gcBefore, false);
+ }
+
+ public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution)
+ {
this.desc = desc;
this.initiator = initiator;
this.gcBefore = gcBefore;
validated = 0;
range = null;
ranges = null;
+ this.evenTreeDistribution = evenTreeDistribution;
}
public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
{
this.tree = tree;
- if (!tree.partitioner().preservesOrder())
+ if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
{
// You can't beat an even tree distribution for md5
tree.init();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 4fec62d..1e0f505 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -516,6 +516,16 @@ public class MerkleTree implements Serializable
return histbuild.buildWithStdevRangesAroundMean();
}
+ public long rowCount()
+ {
+ long count = 0;
+ for (TreeRange range : new TreeRangeIterator(this))
+ {
+ count += range.hashable.rowsInRange;
+ }
+ return count;
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 8ff3022..471f8cf 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -125,7 +125,7 @@ public class CompactionsTest
return store;
}
- private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+ public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
{
long timestamp = System.currentTimeMillis();
for (int i = startRowKey; i <= endRowKey; i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index a9f18f5..61ab3da 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -20,8 +20,16 @@ package org.apache.cassandra.repair;
import java.io.IOException;
import java.net.InetAddress;
import java.security.MessageDigest;
+import java.util.Collections;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.CompactionsTest;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.SequentialWriter;
import org.junit.After;
import org.junit.BeforeClass;
@@ -46,15 +54,20 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.IMessageSink;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import static org.junit.Assert.*;
public class ValidatorTest
{
+ private static final long TEST_TIMEOUT = 60; //seconds
+
private static final String keyspace = "ValidatorTest";
private static final String columnFamily = "Standard1";
private final IPartitioner partitioner = StorageService.getPartitioner();
@@ -81,35 +94,7 @@ public class ValidatorTest
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
- final SimpleCondition lock = new SimpleCondition();
- MessagingService.instance().addMessageSink(new IMessageSink()
- {
- @SuppressWarnings("unchecked")
- public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
- {
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success);
- assertNotNull(((ValidationComplete) m).tree);
- }
- }
- finally
- {
- lock.signalAll();
- }
- return false;
- }
-
- public boolean allowIncomingMessage(MessageIn message, int id)
- {
- return false;
- }
- });
+ final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
InetAddress remote = InetAddress.getByName("127.0.0.2");
@@ -131,8 +116,13 @@ public class ValidatorTest
Token min = tree.partitioner().getMinimumToken();
assertNotNull(tree.hash(new Range<>(min, min)));
- if (!lock.isSignaled())
- lock.await();
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertTrue(((ValidationComplete) m).success);
+ assertNotNull(((ValidationComplete) m).tree);
}
private static class CompactedRowStub extends AbstractCompactedRow
@@ -163,27 +153,91 @@ public class ValidatorTest
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
- final SimpleCondition lock = new SimpleCondition();
+ final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+
+ InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+ Validator validator = new Validator(desc, remote, 0);
+ validator.fail();
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertFalse(((ValidationComplete) m).success);
+ assertNull(((ValidationComplete) m).tree);
+ }
+
+ @Test
+ public void simpleValidationTest128() throws Exception
+ {
+ simpleValidationTest(128);
+ }
+
+ @Test
+ public void simpleValidationTest1500() throws Exception
+ {
+ simpleValidationTest(1500);
+ }
+
+ /**
+ * Test for CASSANDRA-5263
+ * 1. Create N rows
+ * 2. Run validation compaction
+ * 3. Expect merkle tree with size 2^(log2(n))
+ */
+ public void simpleValidationTest(int n) throws Exception
+ {
+ Keyspace ks = Keyspace.open(keyspace);
+ ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+ cfs.clearUnsafe();
+
+ // disable compaction while flushing
+ cfs.disableAutoCompaction();
+
+ CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+
+ cfs.forceBlockingFlush();
+ assertEquals(1, cfs.getSSTables().size());
+
+ // wait enough to force single compaction
+ TimeUnit.SECONDS.sleep(5);
+
+ SSTableReader sstable = cfs.getSSTables().iterator().next();
+ UUID repairSessionId = UUIDGen.getTimeUUID();
+ final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
+ cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
+ sstable.last.getToken()));
+
+ ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
+ Collections.singletonList(cfs), Collections.singleton(desc.range),
+ false, false);
+
+ final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+ Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+ CompactionManager.instance.submitValidation(cfs, validator);
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertTrue(((ValidationComplete) m).success);
+ MerkleTree tree = ((ValidationComplete) m).tree;
+
+ assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
+ assertEquals(tree.rowCount(), n);
+ }
+
+ private ListenableFuture<MessageOut> registerOutgoingMessageSink()
+ {
+ final SettableFuture<MessageOut> future = SettableFuture.create();
MessagingService.instance().addMessageSink(new IMessageSink()
{
- @SuppressWarnings("unchecked")
public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
{
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertFalse(((ValidationComplete) m).success);
- assertNull(((ValidationComplete) m).tree);
- }
- }
- finally
- {
- lock.signalAll();
- }
+ future.set(message);
return false;
}
@@ -192,13 +246,6 @@ public class ValidatorTest
return false;
}
});
-
- InetAddress remote = InetAddress.getByName("127.0.0.2");
-
- Validator validator = new Validator(desc, remote, 0);
- validator.fail();
-
- if (!lock.isSignaled())
- lock.await();
+ return future;
}
}