You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/09/29 20:06:39 UTC

[02/10] cassandra git commit: Fix merkle tree depth calculation

Fix merkle tree depth calculation

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12580


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c70ce630
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c70ce630
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c70ce630

Branch: refs/heads/cassandra-3.0
Commit: c70ce6307da824529762ff40673642b6f86972aa
Parents: 2383935
Author: Paulo Motta <pa...@gmail.com>
Authored: Tue Aug 30 21:06:39 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:26:53 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../db/compaction/CompactionManager.java        |   4 +-
 .../org/apache/cassandra/repair/Validator.java  |   9 +-
 .../org/apache/cassandra/utils/MerkleTree.java  |  10 ++
 .../db/compaction/CompactionsTest.java          |   2 +-
 .../apache/cassandra/repair/ValidatorTest.java  | 159 ++++++++++++-------
 6 files changed, 125 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 998849e..97bc70a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Fix merkle tree depth calculation (CASSANDRA-12580)
  * Make Collections deserialization more robust (CASSANDRA-12618)
  
  
@@ -36,7 +37,6 @@
  * Don't write shadowed range tombstone (CASSANDRA-12030)
 Merged from 2.1:
  * Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
- * Fix queries with empty ByteBuffer values in clustering column restrictions (CASSANDRA-12127) 
  * Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828)
  * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
  * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index cf82498..78fa23c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1112,8 +1112,8 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
             }
-            // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
-            int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+            // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
+            int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
             MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
 
             long start = System.nanoTime();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 4db1cfb..8dbb4cf 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -54,6 +54,7 @@ public class Validator implements Runnable
     public final RepairJobDesc desc;
     public final InetAddress initiator;
     public final int gcBefore;
+    private final boolean evenTreeDistribution;
 
     // null when all rows with the min token have been consumed
     private long validated;
@@ -67,19 +68,25 @@ public class Validator implements Runnable
 
     public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
     {
+        this(desc, initiator, gcBefore, false);
+    }
+
+    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution)
+    {
         this.desc = desc;
         this.initiator = initiator;
         this.gcBefore = gcBefore;
         validated = 0;
         range = null;
         ranges = null;
+        this.evenTreeDistribution = evenTreeDistribution;
     }
 
     public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
     {
         this.tree = tree;
 
-        if (!tree.partitioner().preservesOrder())
+        if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
         {
             // You can't beat an even tree distribution for md5
             tree.init();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 4fec62d..1e0f505 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -516,6 +516,16 @@ public class MerkleTree implements Serializable
         return histbuild.buildWithStdevRangesAroundMean();
     }
 
+    public long rowCount()
+    {
+        long count = 0;
+        for (TreeRange range : new TreeRangeIterator(this))
+        {
+            count += range.hashable.rowsInRange;
+        }
+        return count;
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 8ff3022..471f8cf 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -125,7 +125,7 @@ public class CompactionsTest
         return store;
     }
 
-    private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+    public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
     {
         long timestamp = System.currentTimeMillis();
         for (int i = startRowKey; i <= endRowKey; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index a9f18f5..61ab3da 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -20,8 +20,16 @@ package org.apache.cassandra.repair;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.security.MessageDigest;
+import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.CompactionsTest;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.junit.After;
 import org.junit.BeforeClass;
@@ -46,15 +54,20 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.IMessageSink;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 import static org.junit.Assert.*;
 
 public class ValidatorTest
 {
+    private static final long TEST_TIMEOUT = 60; //seconds
+
     private static final String keyspace = "ValidatorTest";
     private static final String columnFamily = "Standard1";
     private final IPartitioner partitioner = StorageService.getPartitioner();
@@ -81,35 +94,7 @@ public class ValidatorTest
         Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
         final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 
-        final SimpleCondition lock = new SimpleCondition();
-        MessagingService.instance().addMessageSink(new IMessageSink()
-        {
-            @SuppressWarnings("unchecked")
-            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
-            {
-                try
-                {
-                    if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                    {
-                        RepairMessage m = (RepairMessage) message.payload;
-                        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                        assertEquals(desc, m.desc);
-                        assertTrue(((ValidationComplete) m).success);
-                        assertNotNull(((ValidationComplete) m).tree);
-                    }
-                }
-                finally
-                {
-                    lock.signalAll();
-                }
-                return false;
-            }
-
-            public boolean allowIncomingMessage(MessageIn message, int id)
-            {
-                return false;
-            }
-        });
+        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
 
         InetAddress remote = InetAddress.getByName("127.0.0.2");
 
@@ -131,8 +116,13 @@ public class ValidatorTest
         Token min = tree.partitioner().getMinimumToken();
         assertNotNull(tree.hash(new Range<>(min, min)));
 
-        if (!lock.isSignaled())
-            lock.await();
+        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+        RepairMessage m = (RepairMessage) message.payload;
+        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(desc, m.desc);
+        assertTrue(((ValidationComplete) m).success);
+        assertNotNull(((ValidationComplete) m).tree);
     }
 
     private static class CompactedRowStub extends AbstractCompactedRow
@@ -163,27 +153,91 @@ public class ValidatorTest
         Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
         final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 
-        final SimpleCondition lock = new SimpleCondition();
+        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+
+        InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+        Validator validator = new Validator(desc, remote, 0);
+        validator.fail();
+
+        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+        RepairMessage m = (RepairMessage) message.payload;
+        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(desc, m.desc);
+        assertFalse(((ValidationComplete) m).success);
+        assertNull(((ValidationComplete) m).tree);
+    }
+
+    @Test
+    public void simpleValidationTest128() throws Exception
+    {
+        simpleValidationTest(128);
+    }
+
+    @Test
+    public void simpleValidationTest1500() throws Exception
+    {
+        simpleValidationTest(1500);
+    }
+
+    /**
+     * Test for CASSANDRA-5263
+     * 1. Create N rows
+     * 2. Run validation compaction
+     * 3. Expect merkle tree with size 2^(log2(n))
+     */
+    public void simpleValidationTest(int n) throws Exception
+    {
+        Keyspace ks = Keyspace.open(keyspace);
+        ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+        cfs.clearUnsafe();
+
+        // disable compaction while flushing
+        cfs.disableAutoCompaction();
+
+        CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+
+        cfs.forceBlockingFlush();
+        assertEquals(1, cfs.getSSTables().size());
+
+        // wait enough to force single compaction
+        TimeUnit.SECONDS.sleep(5);
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+        UUID repairSessionId = UUIDGen.getTimeUUID();
+        final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
+                                               cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
+                                                                                             sstable.last.getToken()));
+
+        ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
+                                                                 Collections.singletonList(cfs), Collections.singleton(desc.range),
+                                                                 false, false);
+
+        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+        CompactionManager.instance.submitValidation(cfs, validator);
+
+        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+        RepairMessage m = (RepairMessage) message.payload;
+        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(desc, m.desc);
+        assertTrue(((ValidationComplete) m).success);
+        MerkleTree tree = ((ValidationComplete) m).tree;
+
+        assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
+        assertEquals(tree.rowCount(), n);
+    }
+
+    private ListenableFuture<MessageOut> registerOutgoingMessageSink()
+    {
+        final SettableFuture<MessageOut> future = SettableFuture.create();
         MessagingService.instance().addMessageSink(new IMessageSink()
         {
-            @SuppressWarnings("unchecked")
             public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
             {
-                try
-                {
-                    if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                    {
-                        RepairMessage m = (RepairMessage) message.payload;
-                        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                        assertEquals(desc, m.desc);
-                        assertFalse(((ValidationComplete) m).success);
-                        assertNull(((ValidationComplete) m).tree);
-                    }
-                }
-                finally
-                {
-                    lock.signalAll();
-                }
+                future.set(message);
                 return false;
             }
 
@@ -192,13 +246,6 @@ public class ValidatorTest
                 return false;
             }
         });
-
-        InetAddress remote = InetAddress.getByName("127.0.0.2");
-
-        Validator validator = new Validator(desc, remote, 0);
-        validator.fail();
-
-        if (!lock.isSignaled())
-            lock.await();
+        return future;
     }
 }