You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/09/29 20:06:38 UTC

[01/10] cassandra git commit: Fix merkle tree depth calculation

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 238393520 -> c70ce6307
  refs/heads/cassandra-3.0 5cebd1fb0 -> 413e48e65
  refs/heads/cassandra-3.X 57e9a83b2 -> e3b34dc85
  refs/heads/trunk c18968b1b -> 87825f820


Fix merkle tree depth calculation

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12580


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c70ce630
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c70ce630
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c70ce630

Branch: refs/heads/cassandra-2.2
Commit: c70ce6307da824529762ff40673642b6f86972aa
Parents: 2383935
Author: Paulo Motta <pa...@gmail.com>
Authored: Tue Aug 30 21:06:39 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:26:53 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../db/compaction/CompactionManager.java        |   4 +-
 .../org/apache/cassandra/repair/Validator.java  |   9 +-
 .../org/apache/cassandra/utils/MerkleTree.java  |  10 ++
 .../db/compaction/CompactionsTest.java          |   2 +-
 .../apache/cassandra/repair/ValidatorTest.java  | 159 ++++++++++++-------
 6 files changed, 125 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 998849e..97bc70a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Fix merkle tree depth calculation (CASSANDRA-12580)
  * Make Collections deserialization more robust (CASSANDRA-12618)
  
  
@@ -36,7 +37,6 @@
  * Don't write shadowed range tombstone (CASSANDRA-12030)
 Merged from 2.1:
  * Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
- * Fix queries with empty ByteBuffer values in clustering column restrictions (CASSANDRA-12127) 
  * Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828)
  * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
  * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index cf82498..78fa23c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1112,8 +1112,8 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
             }
-            // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
-            int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+            // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
+            int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
             MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
 
             long start = System.nanoTime();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 4db1cfb..8dbb4cf 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -54,6 +54,7 @@ public class Validator implements Runnable
     public final RepairJobDesc desc;
     public final InetAddress initiator;
     public final int gcBefore;
+    private final boolean evenTreeDistribution;
 
     // null when all rows with the min token have been consumed
     private long validated;
@@ -67,19 +68,25 @@ public class Validator implements Runnable
 
     public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
     {
+        this(desc, initiator, gcBefore, false);
+    }
+
+    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution)
+    {
         this.desc = desc;
         this.initiator = initiator;
         this.gcBefore = gcBefore;
         validated = 0;
         range = null;
         ranges = null;
+        this.evenTreeDistribution = evenTreeDistribution;
     }
 
     public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
     {
         this.tree = tree;
 
-        if (!tree.partitioner().preservesOrder())
+        if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
         {
             // You can't beat an even tree distribution for md5
             tree.init();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 4fec62d..1e0f505 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -516,6 +516,16 @@ public class MerkleTree implements Serializable
         return histbuild.buildWithStdevRangesAroundMean();
     }
 
+    public long rowCount()
+    {
+        long count = 0;
+        for (TreeRange range : new TreeRangeIterator(this))
+        {
+            count += range.hashable.rowsInRange;
+        }
+        return count;
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 8ff3022..471f8cf 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -125,7 +125,7 @@ public class CompactionsTest
         return store;
     }
 
-    private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+    public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
     {
         long timestamp = System.currentTimeMillis();
         for (int i = startRowKey; i <= endRowKey; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index a9f18f5..61ab3da 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -20,8 +20,16 @@ package org.apache.cassandra.repair;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.security.MessageDigest;
+import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.CompactionsTest;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.junit.After;
 import org.junit.BeforeClass;
@@ -46,15 +54,20 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.IMessageSink;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 import static org.junit.Assert.*;
 
 public class ValidatorTest
 {
+    private static final long TEST_TIMEOUT = 60; //seconds
+
     private static final String keyspace = "ValidatorTest";
     private static final String columnFamily = "Standard1";
     private final IPartitioner partitioner = StorageService.getPartitioner();
@@ -81,35 +94,7 @@ public class ValidatorTest
         Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
         final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 
-        final SimpleCondition lock = new SimpleCondition();
-        MessagingService.instance().addMessageSink(new IMessageSink()
-        {
-            @SuppressWarnings("unchecked")
-            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
-            {
-                try
-                {
-                    if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                    {
-                        RepairMessage m = (RepairMessage) message.payload;
-                        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                        assertEquals(desc, m.desc);
-                        assertTrue(((ValidationComplete) m).success);
-                        assertNotNull(((ValidationComplete) m).tree);
-                    }
-                }
-                finally
-                {
-                    lock.signalAll();
-                }
-                return false;
-            }
-
-            public boolean allowIncomingMessage(MessageIn message, int id)
-            {
-                return false;
-            }
-        });
+        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
 
         InetAddress remote = InetAddress.getByName("127.0.0.2");
 
@@ -131,8 +116,13 @@ public class ValidatorTest
         Token min = tree.partitioner().getMinimumToken();
         assertNotNull(tree.hash(new Range<>(min, min)));
 
-        if (!lock.isSignaled())
-            lock.await();
+        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+        RepairMessage m = (RepairMessage) message.payload;
+        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(desc, m.desc);
+        assertTrue(((ValidationComplete) m).success);
+        assertNotNull(((ValidationComplete) m).tree);
     }
 
     private static class CompactedRowStub extends AbstractCompactedRow
@@ -163,27 +153,91 @@ public class ValidatorTest
         Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
         final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 
-        final SimpleCondition lock = new SimpleCondition();
+        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+
+        InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+        Validator validator = new Validator(desc, remote, 0);
+        validator.fail();
+
+        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+        RepairMessage m = (RepairMessage) message.payload;
+        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(desc, m.desc);
+        assertFalse(((ValidationComplete) m).success);
+        assertNull(((ValidationComplete) m).tree);
+    }
+
+    @Test
+    public void simpleValidationTest128() throws Exception
+    {
+        simpleValidationTest(128);
+    }
+
+    @Test
+    public void simpleValidationTest1500() throws Exception
+    {
+        simpleValidationTest(1500);
+    }
+
+    /**
+     * Test for CASSANDRA-5263
+     * 1. Create N rows
+     * 2. Run validation compaction
+     * 3. Expect merkle tree with size 2^(log2(n))
+     */
+    public void simpleValidationTest(int n) throws Exception
+    {
+        Keyspace ks = Keyspace.open(keyspace);
+        ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+        cfs.clearUnsafe();
+
+        // disable compaction while flushing
+        cfs.disableAutoCompaction();
+
+        CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+
+        cfs.forceBlockingFlush();
+        assertEquals(1, cfs.getSSTables().size());
+
+        // wait enough to force single compaction
+        TimeUnit.SECONDS.sleep(5);
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+        UUID repairSessionId = UUIDGen.getTimeUUID();
+        final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
+                                               cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
+                                                                                             sstable.last.getToken()));
+
+        ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
+                                                                 Collections.singletonList(cfs), Collections.singleton(desc.range),
+                                                                 false, false);
+
+        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+        CompactionManager.instance.submitValidation(cfs, validator);
+
+        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+        RepairMessage m = (RepairMessage) message.payload;
+        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(desc, m.desc);
+        assertTrue(((ValidationComplete) m).success);
+        MerkleTree tree = ((ValidationComplete) m).tree;
+
+        assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
+        assertEquals(tree.rowCount(), n);
+    }
+
+    private ListenableFuture<MessageOut> registerOutgoingMessageSink()
+    {
+        final SettableFuture<MessageOut> future = SettableFuture.create();
         MessagingService.instance().addMessageSink(new IMessageSink()
         {
-            @SuppressWarnings("unchecked")
             public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
             {
-                try
-                {
-                    if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                    {
-                        RepairMessage m = (RepairMessage) message.payload;
-                        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                        assertEquals(desc, m.desc);
-                        assertFalse(((ValidationComplete) m).success);
-                        assertNull(((ValidationComplete) m).tree);
-                    }
-                }
-                finally
-                {
-                    lock.signalAll();
-                }
+                future.set(message);
                 return false;
             }
 
@@ -192,13 +246,6 @@ public class ValidatorTest
                 return false;
             }
         });
-
-        InetAddress remote = InetAddress.getByName("127.0.0.2");
-
-        Validator validator = new Validator(desc, remote, 0);
-        validator.fail();
-
-        if (!lock.isSignaled())
-            lock.await();
+        return future;
     }
 }


[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/413e48e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/413e48e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/413e48e6

Branch: refs/heads/cassandra-3.0
Commit: 413e48e6571e3c23362d5053e0c7fcdd99bd1e7d
Parents: 5cebd1f c70ce63
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Sep 29 14:32:49 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:32:49 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 .../db/compaction/CompactionManager.java        |   6 +-
 .../org/apache/cassandra/repair/Validator.java  |   9 +-
 .../org/apache/cassandra/utils/MerkleTree.java  |  10 ++
 .../org/apache/cassandra/utils/MerkleTrees.java |  10 ++
 .../db/compaction/CompactionsTest.java          |   2 +-
 .../apache/cassandra/repair/ValidatorTest.java  | 167 ++++++++++++-------
 7 files changed, 146 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f2f8dac,97bc70a..9076e7a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,77 -1,13 +1,78 @@@
 -2.2.9
 +3.0.10
 + * Fix failure in LogTransactionTest (CASSANDRA-12632)
 + * Fix potentially incomplete non-frozen UDT values when querying with the
 +   full primary key specified (CASSANDRA-12605)
 + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
 + * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
 + * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
 + * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
 + * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
 + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
 +Merged from 2.2:
+  * Fix merkle tree depth calculation (CASSANDRA-12580)
+  * Make Collections deserialization more robust (CASSANDRA-12618)
 - 
 - 
 -2.2.8
   * Fix exceptions when enabling gossip on nodes that haven't joined the ring (CASSANDRA-12253)
   * Fix authentication problem when invoking clqsh copy from a SOURCE command (CASSANDRA-12642)
   * Decrement pending range calculator jobs counter in finally block
    (CASSANDRA-12554)
 +Merged from 2.1:
-  * Make Collections deserialization more robust (CASSANDRA-12618)
 + * Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
 +
 +
 +3.0.9
 + * Handle composite prefixes with final EOC=0 as in 2.x and refactor LegacyLayout.decodeBound (CASSANDRA-12423)
 + * Fix paging for 2.x to 3.x upgrades (CASSANDRA-11195)
 + * select_distinct_with_deletions_test failing on non-vnode environments (CASSANDRA-11126)
 + * Stack Overflow returned to queries while upgrading (CASSANDRA-12527)
 + * Fix legacy regex for temporary files from 2.2 (CASSANDRA-12565)
 + * Add option to state current gc_grace_seconds to tools/bin/sstablemetadata (CASSANDRA-12208)
 + * Fix file system race condition that may cause LogAwareFileLister to fail to classify files (CASSANDRA-11889)
 + * Fix file handle leaks due to simultaneous compaction/repair and
 +   listing snapshots, calculating snapshot sizes, or making schema
 +   changes (CASSANDRA-11594)
 + * Fix nodetool repair exits with 0 for some errors (CASSANDRA-12508)
 + * Do not shut down BatchlogManager twice during drain (CASSANDRA-12504)
 + * Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
 + * Calculate last compacted key on startup (CASSANDRA-6216)
 + * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
 + * Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436)
 + * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
 + * Backport CASSANDRA-12002 (CASSANDRA-12177)
 + * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
 + * Fix potential bad messaging service message for paged range reads
 +   within mixed-version 3.x clusters (CASSANDRA-12249)
 + * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
 + * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
 + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
 + * Fix upgrade of super columns on thrift (CASSANDRA-12335)
 + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
 + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
 + * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
 + * Lost counter writes in compact table and static columns (CASSANDRA-12219)
 + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
 + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
 + * Add option to override compaction space check (CASSANDRA-12180)
 + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
 + * Respond with v1/v2 protocol header when responding to driver that attempts
 +   to connect with too low of a protocol version (CASSANDRA-11464)
 + * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
 + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
 + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
 + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
 + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
 + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
 + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 + * Fix JsonTransformer output of partition with deletion info (CASSANDRA-12418)
 + * Fix NPE in SSTableLoader when specifying partial directory path (CASSANDRA-12609)
 +Merged from 2.2:
   * Add local address entry in PropertyFileSnitch (CASSANDRA-11332)
   * cqlshlib tests: increase default execute timeout (CASSANDRA-12481)
   * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 99e0fd5,78fa23c..4d1757e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1094,34 -1102,40 +1094,33 @@@ public class CompactionManager implemen
                  if (validator.gcBefore > 0)
                      gcBefore = validator.gcBefore;
                  else
 -                    gcBefore = getDefaultGcBefore(cfs);
 +                    gcBefore = getDefaultGcBefore(cfs, nowInSec);
              }
  
 -            // Create Merkle tree suitable to hold estimated partitions for given range.
 -            // We blindly assume that partition is evenly distributed on all sstables for now.
 -            long numPartitions = 0;
 -            for (SSTableReader sstable : sstables)
 -            {
 -                numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
 -            }
 -            // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
 -            int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
 -            MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
 -
 +            // Create Merkle trees suitable to hold estimated partitions for the given ranges.
 +            // We blindly assume that a partition is evenly distributed on all sstables for now.
-             // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
 +            MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs);
              long start = System.nanoTime();
 -            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
 +            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
 +                 ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore);
 +                 CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics))
              {
 -                CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
 -                Iterator<AbstractCompactedRow> iter = ci.iterator();
 -                metrics.beginCompaction(ci);
 -                try
 +                // validate the CF as we iterate over it
 +                validator.prepare(cfs, tree);
 +                while (ci.hasNext())
                  {
 -                    // validate the CF as we iterate over it
 -                    validator.prepare(cfs, tree);
 -                    while (iter.hasNext())
 +                    if (ci.isStopRequested())
 +                        throw new CompactionInterruptedException(ci.getCompactionInfo());
 +                    try (UnfilteredRowIterator partition = ci.next())
                      {
 -                        if (ci.isStopRequested())
 -                            throw new CompactionInterruptedException(ci.getCompactionInfo());
 -                        AbstractCompactedRow row = iter.next();
 -                        validator.add(row);
 +                        validator.add(partition);
                      }
 -                    validator.complete();
                  }
 -                finally
 +                validator.complete();
 +            }
 +            finally
 +            {
 +                if (isSnapshotValidation && !isGlobalSnapshotValidation)
                  {
                      // we can only clear the snapshot if we are not doing a global snapshot validation (we then clear it once anticompaction
                      // is done).
@@@ -1144,37 -1167,6 +1143,40 @@@
          }
      }
  
 +    private static MerkleTrees createMerkleTrees(Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, ColumnFamilyStore cfs)
 +    {
 +        MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
 +        long allPartitions = 0;
 +        Map<Range<Token>, Long> rangePartitionCounts = new HashMap<>();
 +        for (Range<Token> range : ranges)
 +        {
 +            long numPartitions = 0;
 +            for (SSTableReader sstable : sstables)
 +                numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(range));
 +            rangePartitionCounts.put(range, numPartitions);
 +            allPartitions += numPartitions;
 +        }
 +
 +        for (Range<Token> range : ranges)
 +        {
 +            long numPartitions = rangePartitionCounts.get(range);
 +            double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0;
++            // determine max tree depth proportional to range size to avoid blowing up memory with multiple tress,
++            // capping at 20 to prevent large tree (CASSANDRA-11390)
 +            int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0;
-             int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), maxDepth) : 0;
++            // determine tree depth from number of partitions, capping at max tree depth (CASSANDRA-5263)
++            int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0;
 +            tree.addMerkleTree((int) Math.pow(2, depth), range);
 +        }
 +        if (logger.isDebugEnabled())
 +        {
 +            // MT serialize may take time
 +            logger.debug("Created {} merkle trees with merkle trees size {}, {} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, MerkleTrees.serializer.serializedSize(tree, 0));
 +        }
 +
 +        return tree;
 +    }
 +
      private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
      {
          Refs<SSTableReader> sstables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/Validator.java
index 217c9de,8dbb4cf..9baa358
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@@ -77,13 -79,14 +83,14 @@@ public class Validator implements Runna
          validated = 0;
          range = null;
          ranges = null;
+         this.evenTreeDistribution = evenTreeDistribution;
      }
  
 -    public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
 +    public void prepare(ColumnFamilyStore cfs, MerkleTrees tree)
      {
 -        this.tree = tree;
 +        this.trees = tree;
  
-         if (!tree.partitioner().preservesOrder())
+         if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
          {
              // You can't beat an even tree distribution for md5
              tree.init();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTrees.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/MerkleTrees.java
index b950b3b,0000000..4ae55ab
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/utils/MerkleTrees.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTrees.java
@@@ -1,436 -1,0 +1,446 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.utils;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.IOException;
 +import java.util.*;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.AbstractIterator;
 +import com.google.common.collect.PeekingIterator;
 +import org.slf4j.Logger;
 +
 +import org.apache.cassandra.db.TypeSizes;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +
 +
 +/**
 + * Wrapper class for handling of multiple MerkleTrees at once.
 + * 
 + * The MerkleTree's are divided in Ranges of non-overlapping tokens.
 + */
 +public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>>
 +{
 +    public static final MerkleTreesSerializer serializer = new MerkleTreesSerializer();
 +
 +    private Map<Range<Token>, MerkleTree> merkleTrees = new TreeMap<>(new TokenRangeComparator());
 +
 +    private IPartitioner partitioner;
 +
 +    /**
 +     * Creates empty MerkleTrees object.
 +     * 
 +     * @param partitioner The partitioner to use
 +     */
 +    public MerkleTrees(IPartitioner partitioner)
 +    {
 +        this(partitioner, new ArrayList<>());
 +    }
 +
 +    private MerkleTrees(IPartitioner partitioner, Collection<MerkleTree> merkleTrees)
 +    {
 +        this.partitioner = partitioner;
 +        addTrees(merkleTrees);
 +    }
 +
 +    /**
 +     * Get the ranges that these merkle trees covers.
 +     * 
 +     * @return
 +     */
 +    public Collection<Range<Token>> ranges()
 +    {
 +        return merkleTrees.keySet();
 +    }
 +
 +    /**
 +     * Get the partitioner in use.
 +     * 
 +     * @return
 +     */
 +    public IPartitioner partitioner()
 +    {
 +        return partitioner;
 +    }
 +
 +    /**
 +     * Add merkle tree's with the defined maxsize and ranges.
 +     * 
 +     * @param maxsize
 +     * @param ranges
 +     */
 +    public void addMerkleTrees(int maxsize, Collection<Range<Token>> ranges)
 +    {
 +        for (Range<Token> range : ranges)
 +        {
 +            addMerkleTree(maxsize, range);
 +        }
 +    }
 +
 +    /**
 +     * Add a MerkleTree with the defined size and range.
 +     * 
 +     * @param maxsize
 +     * @param range
 +     * @return The created merkle tree.
 +     */
 +    public MerkleTree addMerkleTree(int maxsize, Range<Token> range)
 +    {
 +        return addMerkleTree(maxsize, MerkleTree.RECOMMENDED_DEPTH, range);
 +    }
 +
 +    @VisibleForTesting
 +    public MerkleTree addMerkleTree(int maxsize, byte hashdepth, Range<Token> range)
 +    {
 +        MerkleTree tree = new MerkleTree(partitioner, range, hashdepth, maxsize);
 +        addTree(tree);
 +
 +        return tree;
 +    }
 +
 +    /**
 +     * Get the MerkleTree.Range responsible for the given token.
 +     * 
 +     * @param t
 +     * @return
 +     */
 +    @VisibleForTesting
 +    public MerkleTree.TreeRange get(Token t)
 +    {
 +        return getMerkleTree(t).get(t);
 +    }
 +
 +    /**
 +     * Init all MerkleTree's with an even tree distribution.
 +     */
 +    public void init()
 +    {
 +        for (Range<Token> range : merkleTrees.keySet())
 +        {
 +            init(range);
 +        }
 +    }
 +
 +    /**
 +     * Init a selected MerkleTree with an even tree distribution.
 +     * 
 +     * @param range
 +     */
 +    public void init(Range<Token> range)
 +    {
 +        merkleTrees.get(range).init();
 +    }
 +
 +    /**
 +     * Split the MerkleTree responsible for the given token.
 +     * 
 +     * @param t
 +     * @return
 +     */
 +    public boolean split(Token t)
 +    {
 +        return getMerkleTree(t).split(t);
 +    }
 +
 +    /**
 +     * Invalidate the MerkleTree responsible for the given token.
 +     * 
 +     * @param t
 +     */
 +    @VisibleForTesting
 +    public void invalidate(Token t)
 +    {
 +        getMerkleTree(t).invalidate(t);
 +    }
 +
 +    /**
 +     * Get the MerkleTree responsible for the given token range.
 +     * 
 +     * @param range
 +     * @return
 +     */
 +    public MerkleTree getMerkleTree(Range<Token> range)
 +    {
 +        return merkleTrees.get(range);
 +    }
 +
 +    public long size()
 +    {
 +        long size = 0;
 +
 +        for (MerkleTree tree : merkleTrees.values())
 +        {
 +            size += tree.size();
 +        }
 +
 +        return size;
 +    }
 +
 +    @VisibleForTesting
 +    public void maxsize(Range<Token> range, int maxsize)
 +    {
 +        getMerkleTree(range).maxsize(maxsize);
 +    }
 +
 +    /**
 +     * Get the MerkleTree responsible for the given token.
 +     * 
 +     * @param t
 +     * @return The given MerkleTree or null if none exist.
 +     */
 +    private MerkleTree getMerkleTree(Token t)
 +    {
 +        for (Range<Token> range : merkleTrees.keySet())
 +        {
 +            if (range.contains(t))
 +                return merkleTrees.get(range);
 +        }
 +
 +        throw new AssertionError("Expected tree for token " + t);
 +    }
 +
 +    private void addTrees(Collection<MerkleTree> trees)
 +    {
 +        for (MerkleTree tree : trees)
 +        {
 +            addTree(tree);
 +        }
 +    }
 +
 +    private void addTree(MerkleTree tree)
 +    {
 +        assert validateNonOverlapping(tree) : "Range [" + tree.fullRange + "] is intersecting an existing range";
 +
 +        merkleTrees.put(tree.fullRange, tree);
 +    }
 +
 +    private boolean validateNonOverlapping(MerkleTree tree)
 +    {
 +        for (Range<Token> range : merkleTrees.keySet())
 +        {
 +            if (tree.fullRange.intersects(range))
 +                return false;
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Get an iterator for all the invalids generated by the MerkleTrees.
 +     * 
 +     * @return
 +     */
 +    public TreeRangeIterator invalids()
 +    {
 +        return new TreeRangeIterator();
 +    }
 +
 +    /**
 +     * Log the row count per leaf for all MerkleTrees.
 +     * 
 +     * @param logger
 +     */
 +    public void logRowCountPerLeaf(Logger logger)
 +    {
 +        for (MerkleTree tree : merkleTrees.values())
 +        {
 +            tree.histogramOfRowCountPerLeaf().log(logger);
 +        }
 +    }
 +
 +    /**
 +     * Log the row size per leaf for all MerkleTrees.
 +     * 
 +     * @param logger
 +     */
 +    public void logRowSizePerLeaf(Logger logger)
 +    {
 +        for (MerkleTree tree : merkleTrees.values())
 +        {
 +            tree.histogramOfRowSizePerLeaf().log(logger);
 +        }
 +    }
 +
 +    @VisibleForTesting
 +    public byte[] hash(Range<Token> range)
 +    {
 +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +        boolean hashed = false;
 +
 +        try
 +        {
 +            for (Range<Token> rt : merkleTrees.keySet())
 +            {
 +                if (rt.intersects(range))
 +                {
 +                    byte[] bytes = merkleTrees.get(rt).hash(range);
 +                    if (bytes != null)
 +                    {
 +                        baos.write(bytes);
 +                        hashed = true;
 +                    }
 +                }
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException("Unable to append merkle tree hash to result");
 +        }
 +        
 +        return hashed ? baos.toByteArray() : null;
 +    }
 +
 +    /**
 +     * Get an iterator of all ranges and their MerkleTrees.
 +     */
 +    public Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator()
 +    {
 +        return merkleTrees.entrySet().iterator();
 +    }
 +
++    public long rowCount()
++    {
++        long totalCount = 0;
++        for (MerkleTree tree : merkleTrees.values())
++        {
++            totalCount += tree.rowCount();
++        }
++        return totalCount;
++    }
++
 +    public class TreeRangeIterator extends AbstractIterator<MerkleTree.TreeRange> implements
 +            Iterable<MerkleTree.TreeRange>,
 +            PeekingIterator<MerkleTree.TreeRange>
 +    {
 +        private final Iterator<MerkleTree> it;
 +
 +        private MerkleTree.TreeRangeIterator current = null;
 +
 +        private TreeRangeIterator()
 +        {
 +            it = merkleTrees.values().iterator();
 +        }
 +
 +        public MerkleTree.TreeRange computeNext()
 +        {
 +            if (current == null || !current.hasNext())
 +                return nextIterator();
 +
 +            return current.next();
 +        }
 +
 +        private MerkleTree.TreeRange nextIterator()
 +        {
 +            if (it.hasNext())
 +            {
 +                current = it.next().invalids();
 +
 +                return current.next();
 +            }
 +
 +            return endOfData();
 +        }
 +
 +        public Iterator<MerkleTree.TreeRange> iterator()
 +        {
 +            return this;
 +        }
 +    }
 +
 +    /**
 +     * Get the differences between the two sets of MerkleTrees.
 +     * 
 +     * @param ltree
 +     * @param rtree
 +     * @return
 +     */
 +    public static List<Range<Token>> difference(MerkleTrees ltree, MerkleTrees rtree)
 +    {
 +        List<Range<Token>> differences = new ArrayList<>();
 +        for (MerkleTree tree : ltree.merkleTrees.values())
 +        {
 +            differences.addAll(MerkleTree.difference(tree, rtree.getMerkleTree(tree.fullRange)));
 +        }
 +        return differences;
 +    }
 +
 +    public static class MerkleTreesSerializer implements IVersionedSerializer<MerkleTrees>
 +    {
 +        public void serialize(MerkleTrees trees, DataOutputPlus out, int version) throws IOException
 +        {
 +            out.writeInt(trees.merkleTrees.size());
 +            for (MerkleTree tree : trees.merkleTrees.values())
 +            {
 +                MerkleTree.serializer.serialize(tree, out, version);
 +            }
 +        }
 +
 +        public MerkleTrees deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            IPartitioner partitioner = null;
 +            int nTrees = in.readInt();
 +            Collection<MerkleTree> trees = new ArrayList<>(nTrees);
 +            if (nTrees > 0)
 +            {
 +                for (int i = 0; i < nTrees; i++)
 +                {
 +                    MerkleTree tree = MerkleTree.serializer.deserialize(in, version);
 +                    trees.add(tree);
 +
 +                    if (partitioner == null)
 +                        partitioner = tree.partitioner();
 +                    else
 +                        assert tree.partitioner() == partitioner;
 +                }
 +            }
 +
 +            return new MerkleTrees(partitioner, trees);
 +        }
 +
 +        public long serializedSize(MerkleTrees trees, int version)
 +        {
 +            assert trees != null;
 +
 +            long size = TypeSizes.sizeof(trees.merkleTrees.size());
 +            for (MerkleTree tree : trees.merkleTrees.values())
 +            {
 +                size += MerkleTree.serializer.serializedSize(tree, version);
 +            }
 +            return size;
 +        }
 +
 +    }
 +
 +    private static class TokenRangeComparator implements Comparator<Range<Token>>
 +    {
 +        @Override
 +        public int compare(Range<Token> rt1, Range<Token> rt2)
 +        {
 +            if (rt1.left.compareTo(rt2.left) == 0)
 +                return 0;
 +
 +            return rt1.compareTo(rt2);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 198b01b,471f8cf..0ce81d3
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@@ -115,10 -125,9 +115,10 @@@ public class CompactionsTes
          return store;
      }
  
-     private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+     public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
      {
          long timestamp = System.currentTimeMillis();
 +        CFMetaData cfm = Keyspace.open(ks).getColumnFamilyStore(cf).metadata;
          for (int i = startRowKey; i <= endRowKey; i++)
          {
              DecoratedKey key = Util.dk(Integer.toString(i));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 14f5707,61ab3da..9c32cef
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@@ -15,13 -15,22 +15,23 @@@
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
- 
  package org.apache.cassandra.repair;
  
 -import java.io.IOException;
  import java.net.InetAddress;
 -import java.security.MessageDigest;
 +import java.util.Arrays;
+ import java.util.Collections;
++import java.util.Iterator;
++import java.util.Map;
  import java.util.UUID;
++import java.util.concurrent.CompletableFuture;
+ import java.util.concurrent.TimeUnit;
+ 
+ import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.SettableFuture;
  
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.compaction.CompactionsTest;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.io.util.SequentialWriter;
  import org.junit.After;
  import org.junit.BeforeClass;
  import org.junit.Test;
@@@ -39,24 -51,26 +49,29 @@@ import org.apache.cassandra.net.IMessag
  import org.apache.cassandra.net.MessageIn;
  import org.apache.cassandra.net.MessageOut;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.net.IMessageSink;
  import org.apache.cassandra.repair.messages.RepairMessage;
  import org.apache.cassandra.repair.messages.ValidationComplete;
 +import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.service.ActiveRepairService;
 -import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.MerkleTree;
 +import org.apache.cassandra.utils.MerkleTrees;
- import org.apache.cassandra.utils.concurrent.SimpleCondition;
++import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.UUIDGen;
 -import org.apache.cassandra.utils.concurrent.SimpleCondition;
  
 -import static org.junit.Assert.*;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertNull;
 +import static org.junit.Assert.assertTrue;
  
  public class ValidatorTest
  {
+     private static final long TEST_TIMEOUT = 60; //seconds
+ 
      private static final String keyspace = "ValidatorTest";
      private static final String columnFamily = "Standard1";
 -    private final IPartitioner partitioner = StorageService.getPartitioner();
 +    private static IPartitioner partitioner;
  
      @BeforeClass
      public static void defineSchema() throws Exception
@@@ -78,36 -92,9 +93,9 @@@
      public void testValidatorComplete() throws Throwable
      {
          Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
 -        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 +        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
  
-         final SimpleCondition lock = new SimpleCondition();
-         MessagingService.instance().addMessageSink(new IMessageSink()
-         {
-             public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
-             {
-                 try
-                 {
-                     if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                     {
-                         RepairMessage m = (RepairMessage) message.payload;
-                         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                         assertEquals(desc, m.desc);
-                         assertTrue(((ValidationComplete) m).success());
-                         assertNotNull(((ValidationComplete) m).trees);
-                     }
-                 }
-                 finally
-                 {
-                     lock.signalAll();
-                 }
-                 return false;
-             }
- 
-             public boolean allowIncomingMessage(MessageIn message, int id)
-             {
-                 return false;
-             }
-         });
 -        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++        final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
  
          InetAddress remote = InetAddress.getByName("127.0.0.2");
  
@@@ -130,37 -116,128 +118,111 @@@
          Token min = tree.partitioner().getMinimumToken();
          assertNotNull(tree.hash(new Range<>(min, min)));
  
-         if (!lock.isSignaled())
-             lock.await();
+         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+         assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+         RepairMessage m = (RepairMessage) message.payload;
+         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+         assertEquals(desc, m.desc);
 -        assertTrue(((ValidationComplete) m).success);
 -        assertNotNull(((ValidationComplete) m).tree);
++        assertTrue(((ValidationComplete) m).success());
++        assertNotNull(((ValidationComplete) m).trees);
      }
  
 -    private static class CompactedRowStub extends AbstractCompactedRow
 -    {
 -        private CompactedRowStub(DecoratedKey key)
 -        {
 -            super(key);
 -        }
 -
 -        public RowIndexEntry write(long currentPosition, SequentialWriter out) throws IOException
 -        {
 -            throw new UnsupportedOperationException();
 -        }
 -
 -        public void update(MessageDigest digest) { }
 -
 -        public ColumnStats columnStats()
 -        {
 -            throw new UnsupportedOperationException();
 -        }
 -
 -        public void close() throws IOException { }
 -    }
  
      @Test
      public void testValidatorFailed() throws Throwable
      {
          Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
 -        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 +        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
  
-         final SimpleCondition lock = new SimpleCondition();
 -        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++        final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+ 
+         InetAddress remote = InetAddress.getByName("127.0.0.2");
+ 
+         Validator validator = new Validator(desc, remote, 0);
+         validator.fail();
+ 
+         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+         assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+         RepairMessage m = (RepairMessage) message.payload;
+         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+         assertEquals(desc, m.desc);
 -        assertFalse(((ValidationComplete) m).success);
 -        assertNull(((ValidationComplete) m).tree);
++        assertFalse(((ValidationComplete) m).success());
++        assertNull(((ValidationComplete) m).trees);
+     }
+ 
+     @Test
+     public void simpleValidationTest128() throws Exception
+     {
+         simpleValidationTest(128);
+     }
+ 
+     @Test
+     public void simpleValidationTest1500() throws Exception
+     {
+         simpleValidationTest(1500);
+     }
+ 
+     /**
+      * Test for CASSANDRA-5263
+      * 1. Create N rows
+      * 2. Run validation compaction
+      * 3. Expect merkle tree with size 2^(log2(n))
+      */
+     public void simpleValidationTest(int n) throws Exception
+     {
+         Keyspace ks = Keyspace.open(keyspace);
+         ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+         cfs.clearUnsafe();
+ 
+         // disable compaction while flushing
+         cfs.disableAutoCompaction();
+ 
+         CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+ 
+         cfs.forceBlockingFlush();
 -        assertEquals(1, cfs.getSSTables().size());
++        assertEquals(1, cfs.getLiveSSTables().size());
+ 
+         // wait enough to force single compaction
+         TimeUnit.SECONDS.sleep(5);
+ 
 -        SSTableReader sstable = cfs.getSSTables().iterator().next();
++        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+         UUID repairSessionId = UUIDGen.getTimeUUID();
+         final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
 -                                               cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
 -                                                                                             sstable.last.getToken()));
++                                               cfs.getColumnFamilyName(), Collections.singletonList(new Range<>(sstable.first.getToken(),
++                                                                                                                sstable.last.getToken())));
+ 
+         ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
 -                                                                 Collections.singletonList(cfs), Collections.singleton(desc.range),
 -                                                                 false, false);
++                                                                 Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
++                                                                 false);
+ 
 -        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++        final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+         Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+         CompactionManager.instance.submitValidation(cfs, validator);
+ 
+         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+         assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+         RepairMessage m = (RepairMessage) message.payload;
+         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+         assertEquals(desc, m.desc);
 -        assertTrue(((ValidationComplete) m).success);
 -        MerkleTree tree = ((ValidationComplete) m).tree;
++        assertTrue(((ValidationComplete) m).success());
++        MerkleTrees trees = ((ValidationComplete) m).trees;
+ 
 -        assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
 -        assertEquals(tree.rowCount(), n);
++        Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = trees.iterator();
++        while (iterator.hasNext())
++        {
++            assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), iterator.next().getValue().size(), 0.0);
++        }
++        assertEquals(trees.rowCount(), n);
+     }
+ 
 -    private ListenableFuture<MessageOut> registerOutgoingMessageSink()
++    private CompletableFuture<MessageOut> registerOutgoingMessageSink()
+     {
 -        final SettableFuture<MessageOut> future = SettableFuture.create();
++        final CompletableFuture<MessageOut> future = new CompletableFuture<>();
          MessagingService.instance().addMessageSink(new IMessageSink()
          {
              public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
              {
-                 try
-                 {
-                     if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                     {
-                         RepairMessage m = (RepairMessage) message.payload;
-                         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                         assertEquals(desc, m.desc);
-                         assertFalse(((ValidationComplete) m).success());
-                         assertNull(((ValidationComplete) m).trees);
-                     }
-                 }
-                 finally
-                 {
-                     lock.signalAll();
-                 }
 -                future.set(message);
++                future.complete(message);
                  return false;
              }
  


[02/10] cassandra git commit: Fix merkle tree depth calculation

Posted by yu...@apache.org.
Fix merkle tree depth calculation

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12580


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c70ce630
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c70ce630
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c70ce630

Branch: refs/heads/cassandra-3.0
Commit: c70ce6307da824529762ff40673642b6f86972aa
Parents: 2383935
Author: Paulo Motta <pa...@gmail.com>
Authored: Tue Aug 30 21:06:39 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:26:53 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../db/compaction/CompactionManager.java        |   4 +-
 .../org/apache/cassandra/repair/Validator.java  |   9 +-
 .../org/apache/cassandra/utils/MerkleTree.java  |  10 ++
 .../db/compaction/CompactionsTest.java          |   2 +-
 .../apache/cassandra/repair/ValidatorTest.java  | 159 ++++++++++++-------
 6 files changed, 125 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 998849e..97bc70a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Fix merkle tree depth calculation (CASSANDRA-12580)
  * Make Collections deserialization more robust (CASSANDRA-12618)
  
  
@@ -36,7 +37,6 @@
  * Don't write shadowed range tombstone (CASSANDRA-12030)
 Merged from 2.1:
  * Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
- * Fix queries with empty ByteBuffer values in clustering column restrictions (CASSANDRA-12127) 
  * Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828)
  * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
  * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index cf82498..78fa23c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1112,8 +1112,8 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
             }
-            // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
-            int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+            // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
+            int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
             MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
 
             long start = System.nanoTime();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 4db1cfb..8dbb4cf 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -54,6 +54,7 @@ public class Validator implements Runnable
     public final RepairJobDesc desc;
     public final InetAddress initiator;
     public final int gcBefore;
+    private final boolean evenTreeDistribution;
 
     // null when all rows with the min token have been consumed
     private long validated;
@@ -67,19 +68,25 @@ public class Validator implements Runnable
 
     public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
     {
+        this(desc, initiator, gcBefore, false);
+    }
+
+    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution)
+    {
         this.desc = desc;
         this.initiator = initiator;
         this.gcBefore = gcBefore;
         validated = 0;
         range = null;
         ranges = null;
+        this.evenTreeDistribution = evenTreeDistribution;
     }
 
     public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
     {
         this.tree = tree;
 
-        if (!tree.partitioner().preservesOrder())
+        if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
         {
             // You can't beat an even tree distribution for md5
             tree.init();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 4fec62d..1e0f505 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -516,6 +516,16 @@ public class MerkleTree implements Serializable
         return histbuild.buildWithStdevRangesAroundMean();
     }
 
+    public long rowCount()
+    {
+        long count = 0;
+        for (TreeRange range : new TreeRangeIterator(this))
+        {
+            count += range.hashable.rowsInRange;
+        }
+        return count;
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 8ff3022..471f8cf 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -125,7 +125,7 @@ public class CompactionsTest
         return store;
     }
 
-    private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+    public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
     {
         long timestamp = System.currentTimeMillis();
         for (int i = startRowKey; i <= endRowKey; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index a9f18f5..61ab3da 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -20,8 +20,16 @@ package org.apache.cassandra.repair;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.security.MessageDigest;
+import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.CompactionsTest;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.junit.After;
 import org.junit.BeforeClass;
@@ -46,15 +54,20 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.IMessageSink;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 import static org.junit.Assert.*;
 
 public class ValidatorTest
 {
+    private static final long TEST_TIMEOUT = 60; //seconds
+
     private static final String keyspace = "ValidatorTest";
     private static final String columnFamily = "Standard1";
     private final IPartitioner partitioner = StorageService.getPartitioner();
@@ -81,35 +94,7 @@ public class ValidatorTest
         Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
         final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 
-        final SimpleCondition lock = new SimpleCondition();
-        MessagingService.instance().addMessageSink(new IMessageSink()
-        {
-            @SuppressWarnings("unchecked")
-            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
-            {
-                try
-                {
-                    if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                    {
-                        RepairMessage m = (RepairMessage) message.payload;
-                        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                        assertEquals(desc, m.desc);
-                        assertTrue(((ValidationComplete) m).success);
-                        assertNotNull(((ValidationComplete) m).tree);
-                    }
-                }
-                finally
-                {
-                    lock.signalAll();
-                }
-                return false;
-            }
-
-            public boolean allowIncomingMessage(MessageIn message, int id)
-            {
-                return false;
-            }
-        });
+        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
 
         InetAddress remote = InetAddress.getByName("127.0.0.2");
 
@@ -131,8 +116,13 @@ public class ValidatorTest
         Token min = tree.partitioner().getMinimumToken();
         assertNotNull(tree.hash(new Range<>(min, min)));
 
-        if (!lock.isSignaled())
-            lock.await();
+        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+        RepairMessage m = (RepairMessage) message.payload;
+        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(desc, m.desc);
+        assertTrue(((ValidationComplete) m).success);
+        assertNotNull(((ValidationComplete) m).tree);
     }
 
     private static class CompactedRowStub extends AbstractCompactedRow
@@ -163,27 +153,91 @@ public class ValidatorTest
         Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
         final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 
-        final SimpleCondition lock = new SimpleCondition();
+        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+
+        InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+        Validator validator = new Validator(desc, remote, 0);
+        validator.fail();
+
+        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+        RepairMessage m = (RepairMessage) message.payload;
+        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(desc, m.desc);
+        assertFalse(((ValidationComplete) m).success);
+        assertNull(((ValidationComplete) m).tree);
+    }
+
+    @Test
+    public void simpleValidationTest128() throws Exception
+    {
+        simpleValidationTest(128);
+    }
+
+    @Test
+    public void simpleValidationTest1500() throws Exception
+    {
+        simpleValidationTest(1500);
+    }
+
+    /**
+     * Test for CASSANDRA-5263
+     * 1. Create N rows
+     * 2. Run validation compaction
+     * 3. Expect merkle tree with size 2^(log2(n))
+     */
+    public void simpleValidationTest(int n) throws Exception
+    {
+        Keyspace ks = Keyspace.open(keyspace);
+        ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+        cfs.clearUnsafe();
+
+        // disable compaction while flushing
+        cfs.disableAutoCompaction();
+
+        CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+
+        cfs.forceBlockingFlush();
+        assertEquals(1, cfs.getSSTables().size());
+
+        // wait enough to force single compaction
+        TimeUnit.SECONDS.sleep(5);
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+        UUID repairSessionId = UUIDGen.getTimeUUID();
+        final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
+                                               cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
+                                                                                             sstable.last.getToken()));
+
+        ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
+                                                                 Collections.singletonList(cfs), Collections.singleton(desc.range),
+                                                                 false, false);
+
+        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+        CompactionManager.instance.submitValidation(cfs, validator);
+
+        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+        RepairMessage m = (RepairMessage) message.payload;
+        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(desc, m.desc);
+        assertTrue(((ValidationComplete) m).success);
+        MerkleTree tree = ((ValidationComplete) m).tree;
+
+        assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
+        assertEquals(tree.rowCount(), n);
+    }
+
+    private ListenableFuture<MessageOut> registerOutgoingMessageSink()
+    {
+        final SettableFuture<MessageOut> future = SettableFuture.create();
         MessagingService.instance().addMessageSink(new IMessageSink()
         {
-            @SuppressWarnings("unchecked")
             public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
             {
-                try
-                {
-                    if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                    {
-                        RepairMessage m = (RepairMessage) message.payload;
-                        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                        assertEquals(desc, m.desc);
-                        assertFalse(((ValidationComplete) m).success);
-                        assertNull(((ValidationComplete) m).tree);
-                    }
-                }
-                finally
-                {
-                    lock.signalAll();
-                }
+                future.set(message);
                 return false;
             }
 
@@ -192,13 +246,6 @@ public class ValidatorTest
                 return false;
             }
         });
-
-        InetAddress remote = InetAddress.getByName("127.0.0.2");
-
-        Validator validator = new Validator(desc, remote, 0);
-        validator.fail();
-
-        if (!lock.isSignaled())
-            lock.await();
+        return future;
     }
 }


[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/413e48e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/413e48e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/413e48e6

Branch: refs/heads/trunk
Commit: 413e48e6571e3c23362d5053e0c7fcdd99bd1e7d
Parents: 5cebd1f c70ce63
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Sep 29 14:32:49 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:32:49 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 .../db/compaction/CompactionManager.java        |   6 +-
 .../org/apache/cassandra/repair/Validator.java  |   9 +-
 .../org/apache/cassandra/utils/MerkleTree.java  |  10 ++
 .../org/apache/cassandra/utils/MerkleTrees.java |  10 ++
 .../db/compaction/CompactionsTest.java          |   2 +-
 .../apache/cassandra/repair/ValidatorTest.java  | 167 ++++++++++++-------
 7 files changed, 146 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f2f8dac,97bc70a..9076e7a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,77 -1,13 +1,78 @@@
 -2.2.9
 +3.0.10
 + * Fix failure in LogTransactionTest (CASSANDRA-12632)
 + * Fix potentially incomplete non-frozen UDT values when querying with the
 +   full primary key specified (CASSANDRA-12605)
 + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
 + * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
 + * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
 + * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
 + * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
 + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
 +Merged from 2.2:
+  * Fix merkle tree depth calculation (CASSANDRA-12580)
+  * Make Collections deserialization more robust (CASSANDRA-12618)
 - 
 - 
 -2.2.8
   * Fix exceptions when enabling gossip on nodes that haven't joined the ring (CASSANDRA-12253)
   * Fix authentication problem when invoking clqsh copy from a SOURCE command (CASSANDRA-12642)
   * Decrement pending range calculator jobs counter in finally block
    (CASSANDRA-12554)
 +Merged from 2.1:
-  * Make Collections deserialization more robust (CASSANDRA-12618)
 + * Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
 +
 +
 +3.0.9
 + * Handle composite prefixes with final EOC=0 as in 2.x and refactor LegacyLayout.decodeBound (CASSANDRA-12423)
 + * Fix paging for 2.x to 3.x upgrades (CASSANDRA-11195)
 + * select_distinct_with_deletions_test failing on non-vnode environments (CASSANDRA-11126)
 + * Stack Overflow returned to queries while upgrading (CASSANDRA-12527)
 + * Fix legacy regex for temporary files from 2.2 (CASSANDRA-12565)
 + * Add option to state current gc_grace_seconds to tools/bin/sstablemetadata (CASSANDRA-12208)
 + * Fix file system race condition that may cause LogAwareFileLister to fail to classify files (CASSANDRA-11889)
 + * Fix file handle leaks due to simultaneous compaction/repair and
 +   listing snapshots, calculating snapshot sizes, or making schema
 +   changes (CASSANDRA-11594)
 + * Fix nodetool repair exits with 0 for some errors (CASSANDRA-12508)
 + * Do not shut down BatchlogManager twice during drain (CASSANDRA-12504)
 + * Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
 + * Calculate last compacted key on startup (CASSANDRA-6216)
 + * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
 + * Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436)
 + * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
 + * Backport CASSANDRA-12002 (CASSANDRA-12177)
 + * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
 + * Fix potential bad messaging service message for paged range reads
 +   within mixed-version 3.x clusters (CASSANDRA-12249)
 + * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
 + * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
 + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
 + * Fix upgrade of super columns on thrift (CASSANDRA-12335)
 + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
 + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
 + * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
 + * Lost counter writes in compact table and static columns (CASSANDRA-12219)
 + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
 + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
 + * Add option to override compaction space check (CASSANDRA-12180)
 + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
 + * Respond with v1/v2 protocol header when responding to driver that attempts
 +   to connect with too low of a protocol version (CASSANDRA-11464)
 + * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
 + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
 + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
 + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
 + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
 + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
 + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 + * Fix JsonTransformer output of partition with deletion info (CASSANDRA-12418)
 + * Fix NPE in SSTableLoader when specifying partial directory path (CASSANDRA-12609)
 +Merged from 2.2:
   * Add local address entry in PropertyFileSnitch (CASSANDRA-11332)
   * cqlshlib tests: increase default execute timeout (CASSANDRA-12481)
   * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 99e0fd5,78fa23c..4d1757e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1094,34 -1102,40 +1094,33 @@@ public class CompactionManager implemen
                  if (validator.gcBefore > 0)
                      gcBefore = validator.gcBefore;
                  else
 -                    gcBefore = getDefaultGcBefore(cfs);
 +                    gcBefore = getDefaultGcBefore(cfs, nowInSec);
              }
  
 -            // Create Merkle tree suitable to hold estimated partitions for given range.
 -            // We blindly assume that partition is evenly distributed on all sstables for now.
 -            long numPartitions = 0;
 -            for (SSTableReader sstable : sstables)
 -            {
 -                numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
 -            }
 -            // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
 -            int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
 -            MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
 -
 +            // Create Merkle trees suitable to hold estimated partitions for the given ranges.
 +            // We blindly assume that a partition is evenly distributed on all sstables for now.
-             // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
 +            MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs);
              long start = System.nanoTime();
 -            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
 +            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
 +                 ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore);
 +                 CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics))
              {
 -                CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
 -                Iterator<AbstractCompactedRow> iter = ci.iterator();
 -                metrics.beginCompaction(ci);
 -                try
 +                // validate the CF as we iterate over it
 +                validator.prepare(cfs, tree);
 +                while (ci.hasNext())
                  {
 -                    // validate the CF as we iterate over it
 -                    validator.prepare(cfs, tree);
 -                    while (iter.hasNext())
 +                    if (ci.isStopRequested())
 +                        throw new CompactionInterruptedException(ci.getCompactionInfo());
 +                    try (UnfilteredRowIterator partition = ci.next())
                      {
 -                        if (ci.isStopRequested())
 -                            throw new CompactionInterruptedException(ci.getCompactionInfo());
 -                        AbstractCompactedRow row = iter.next();
 -                        validator.add(row);
 +                        validator.add(partition);
                      }
 -                    validator.complete();
                  }
 -                finally
 +                validator.complete();
 +            }
 +            finally
 +            {
 +                if (isSnapshotValidation && !isGlobalSnapshotValidation)
                  {
                      // we can only clear the snapshot if we are not doing a global snapshot validation (we then clear it once anticompaction
                      // is done).
@@@ -1144,37 -1167,6 +1143,40 @@@
          }
      }
  
 +    private static MerkleTrees createMerkleTrees(Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, ColumnFamilyStore cfs)
 +    {
 +        MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
 +        long allPartitions = 0;
 +        Map<Range<Token>, Long> rangePartitionCounts = new HashMap<>();
 +        for (Range<Token> range : ranges)
 +        {
 +            long numPartitions = 0;
 +            for (SSTableReader sstable : sstables)
 +                numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(range));
 +            rangePartitionCounts.put(range, numPartitions);
 +            allPartitions += numPartitions;
 +        }
 +
 +        for (Range<Token> range : ranges)
 +        {
 +            long numPartitions = rangePartitionCounts.get(range);
 +            double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0;
++            // determine max tree depth proportional to range size to avoid blowing up memory with multiple tress,
++            // capping at 20 to prevent large tree (CASSANDRA-11390)
 +            int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0;
-             int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), maxDepth) : 0;
++            // determine tree depth from number of partitions, capping at max tree depth (CASSANDRA-5263)
++            int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0;
 +            tree.addMerkleTree((int) Math.pow(2, depth), range);
 +        }
 +        if (logger.isDebugEnabled())
 +        {
 +            // MT serialize may take time
 +            logger.debug("Created {} merkle trees with merkle trees size {}, {} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, MerkleTrees.serializer.serializedSize(tree, 0));
 +        }
 +
 +        return tree;
 +    }
 +
      private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
      {
          Refs<SSTableReader> sstables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/Validator.java
index 217c9de,8dbb4cf..9baa358
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@@ -77,13 -79,14 +83,14 @@@ public class Validator implements Runna
          validated = 0;
          range = null;
          ranges = null;
+         this.evenTreeDistribution = evenTreeDistribution;
      }
  
 -    public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
 +    public void prepare(ColumnFamilyStore cfs, MerkleTrees tree)
      {
 -        this.tree = tree;
 +        this.trees = tree;
  
-         if (!tree.partitioner().preservesOrder())
+         if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
          {
              // You can't beat an even tree distribution for md5
              tree.init();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTrees.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/MerkleTrees.java
index b950b3b,0000000..4ae55ab
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/utils/MerkleTrees.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTrees.java
@@@ -1,436 -1,0 +1,446 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.utils;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.IOException;
 +import java.util.*;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.AbstractIterator;
 +import com.google.common.collect.PeekingIterator;
 +import org.slf4j.Logger;
 +
 +import org.apache.cassandra.db.TypeSizes;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +
 +
 +/**
 + * Wrapper class for handling of multiple MerkleTrees at once.
 + * 
 + * The MerkleTree's are divided in Ranges of non-overlapping tokens.
 + */
 +public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>>
 +{
 +    public static final MerkleTreesSerializer serializer = new MerkleTreesSerializer();
 +
 +    private Map<Range<Token>, MerkleTree> merkleTrees = new TreeMap<>(new TokenRangeComparator());
 +
 +    private IPartitioner partitioner;
 +
 +    /**
 +     * Creates empty MerkleTrees object.
 +     * 
 +     * @param partitioner The partitioner to use
 +     */
 +    public MerkleTrees(IPartitioner partitioner)
 +    {
 +        this(partitioner, new ArrayList<>());
 +    }
 +
 +    private MerkleTrees(IPartitioner partitioner, Collection<MerkleTree> merkleTrees)
 +    {
 +        this.partitioner = partitioner;
 +        addTrees(merkleTrees);
 +    }
 +
 +    /**
 +     * Get the ranges that these merkle trees covers.
 +     * 
 +     * @return
 +     */
 +    public Collection<Range<Token>> ranges()
 +    {
 +        return merkleTrees.keySet();
 +    }
 +
 +    /**
 +     * Get the partitioner in use.
 +     * 
 +     * @return
 +     */
 +    public IPartitioner partitioner()
 +    {
 +        return partitioner;
 +    }
 +
 +    /**
 +     * Add merkle tree's with the defined maxsize and ranges.
 +     * 
 +     * @param maxsize
 +     * @param ranges
 +     */
 +    public void addMerkleTrees(int maxsize, Collection<Range<Token>> ranges)
 +    {
 +        for (Range<Token> range : ranges)
 +        {
 +            addMerkleTree(maxsize, range);
 +        }
 +    }
 +
 +    /**
 +     * Add a MerkleTree with the defined size and range.
 +     * 
 +     * @param maxsize
 +     * @param range
 +     * @return The created merkle tree.
 +     */
 +    public MerkleTree addMerkleTree(int maxsize, Range<Token> range)
 +    {
 +        return addMerkleTree(maxsize, MerkleTree.RECOMMENDED_DEPTH, range);
 +    }
 +
 +    @VisibleForTesting
 +    public MerkleTree addMerkleTree(int maxsize, byte hashdepth, Range<Token> range)
 +    {
 +        MerkleTree tree = new MerkleTree(partitioner, range, hashdepth, maxsize);
 +        addTree(tree);
 +
 +        return tree;
 +    }
 +
 +    /**
 +     * Get the MerkleTree.Range responsible for the given token.
 +     * 
 +     * @param t
 +     * @return
 +     */
 +    @VisibleForTesting
 +    public MerkleTree.TreeRange get(Token t)
 +    {
 +        return getMerkleTree(t).get(t);
 +    }
 +
 +    /**
 +     * Init all MerkleTree's with an even tree distribution.
 +     */
 +    public void init()
 +    {
 +        for (Range<Token> range : merkleTrees.keySet())
 +        {
 +            init(range);
 +        }
 +    }
 +
 +    /**
 +     * Init a selected MerkleTree with an even tree distribution.
 +     * 
 +     * @param range
 +     */
 +    public void init(Range<Token> range)
 +    {
 +        merkleTrees.get(range).init();
 +    }
 +
 +    /**
 +     * Split the MerkleTree responsible for the given token.
 +     * 
 +     * @param t
 +     * @return
 +     */
 +    public boolean split(Token t)
 +    {
 +        return getMerkleTree(t).split(t);
 +    }
 +
 +    /**
 +     * Invalidate the MerkleTree responsible for the given token.
 +     * 
 +     * @param t
 +     */
 +    @VisibleForTesting
 +    public void invalidate(Token t)
 +    {
 +        getMerkleTree(t).invalidate(t);
 +    }
 +
 +    /**
 +     * Get the MerkleTree responsible for the given token range.
 +     * 
 +     * @param range
 +     * @return
 +     */
 +    public MerkleTree getMerkleTree(Range<Token> range)
 +    {
 +        return merkleTrees.get(range);
 +    }
 +
 +    public long size()
 +    {
 +        long size = 0;
 +
 +        for (MerkleTree tree : merkleTrees.values())
 +        {
 +            size += tree.size();
 +        }
 +
 +        return size;
 +    }
 +
 +    @VisibleForTesting
 +    public void maxsize(Range<Token> range, int maxsize)
 +    {
 +        getMerkleTree(range).maxsize(maxsize);
 +    }
 +
 +    /**
 +     * Get the MerkleTree responsible for the given token.
 +     * 
 +     * @param t
 +     * @return The given MerkleTree or null if none exist.
 +     */
 +    private MerkleTree getMerkleTree(Token t)
 +    {
 +        for (Range<Token> range : merkleTrees.keySet())
 +        {
 +            if (range.contains(t))
 +                return merkleTrees.get(range);
 +        }
 +
 +        throw new AssertionError("Expected tree for token " + t);
 +    }
 +
 +    private void addTrees(Collection<MerkleTree> trees)
 +    {
 +        for (MerkleTree tree : trees)
 +        {
 +            addTree(tree);
 +        }
 +    }
 +
 +    private void addTree(MerkleTree tree)
 +    {
 +        assert validateNonOverlapping(tree) : "Range [" + tree.fullRange + "] is intersecting an existing range";
 +
 +        merkleTrees.put(tree.fullRange, tree);
 +    }
 +
 +    private boolean validateNonOverlapping(MerkleTree tree)
 +    {
 +        for (Range<Token> range : merkleTrees.keySet())
 +        {
 +            if (tree.fullRange.intersects(range))
 +                return false;
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Get an iterator for all the invalids generated by the MerkleTrees.
 +     * 
 +     * @return
 +     */
 +    public TreeRangeIterator invalids()
 +    {
 +        return new TreeRangeIterator();
 +    }
 +
 +    /**
 +     * Log the row count per leaf for all MerkleTrees.
 +     * 
 +     * @param logger
 +     */
 +    public void logRowCountPerLeaf(Logger logger)
 +    {
 +        for (MerkleTree tree : merkleTrees.values())
 +        {
 +            tree.histogramOfRowCountPerLeaf().log(logger);
 +        }
 +    }
 +
 +    /**
 +     * Log the row size per leaf for all MerkleTrees.
 +     * 
 +     * @param logger
 +     */
 +    public void logRowSizePerLeaf(Logger logger)
 +    {
 +        for (MerkleTree tree : merkleTrees.values())
 +        {
 +            tree.histogramOfRowSizePerLeaf().log(logger);
 +        }
 +    }
 +
 +    @VisibleForTesting
 +    public byte[] hash(Range<Token> range)
 +    {
 +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +        boolean hashed = false;
 +
 +        try
 +        {
 +            for (Range<Token> rt : merkleTrees.keySet())
 +            {
 +                if (rt.intersects(range))
 +                {
 +                    byte[] bytes = merkleTrees.get(rt).hash(range);
 +                    if (bytes != null)
 +                    {
 +                        baos.write(bytes);
 +                        hashed = true;
 +                    }
 +                }
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException("Unable to append merkle tree hash to result");
 +        }
 +        
 +        return hashed ? baos.toByteArray() : null;
 +    }
 +
 +    /**
 +     * Get an iterator of all ranges and their MerkleTrees.
 +     */
 +    public Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator()
 +    {
 +        return merkleTrees.entrySet().iterator();
 +    }
 +
++    public long rowCount()
++    {
++        long totalCount = 0;
++        for (MerkleTree tree : merkleTrees.values())
++        {
++            totalCount += tree.rowCount();
++        }
++        return totalCount;
++    }
++
 +    public class TreeRangeIterator extends AbstractIterator<MerkleTree.TreeRange> implements
 +            Iterable<MerkleTree.TreeRange>,
 +            PeekingIterator<MerkleTree.TreeRange>
 +    {
 +        private final Iterator<MerkleTree> it;
 +
 +        private MerkleTree.TreeRangeIterator current = null;
 +
 +        private TreeRangeIterator()
 +        {
 +            it = merkleTrees.values().iterator();
 +        }
 +
 +        public MerkleTree.TreeRange computeNext()
 +        {
 +            if (current == null || !current.hasNext())
 +                return nextIterator();
 +
 +            return current.next();
 +        }
 +
 +        private MerkleTree.TreeRange nextIterator()
 +        {
 +            if (it.hasNext())
 +            {
 +                current = it.next().invalids();
 +
 +                return current.next();
 +            }
 +
 +            return endOfData();
 +        }
 +
 +        public Iterator<MerkleTree.TreeRange> iterator()
 +        {
 +            return this;
 +        }
 +    }
 +
 +    /**
 +     * Get the differences between the two sets of MerkleTrees.
 +     * 
 +     * @param ltree
 +     * @param rtree
 +     * @return
 +     */
 +    public static List<Range<Token>> difference(MerkleTrees ltree, MerkleTrees rtree)
 +    {
 +        List<Range<Token>> differences = new ArrayList<>();
 +        for (MerkleTree tree : ltree.merkleTrees.values())
 +        {
 +            differences.addAll(MerkleTree.difference(tree, rtree.getMerkleTree(tree.fullRange)));
 +        }
 +        return differences;
 +    }
 +
 +    public static class MerkleTreesSerializer implements IVersionedSerializer<MerkleTrees>
 +    {
 +        public void serialize(MerkleTrees trees, DataOutputPlus out, int version) throws IOException
 +        {
 +            out.writeInt(trees.merkleTrees.size());
 +            for (MerkleTree tree : trees.merkleTrees.values())
 +            {
 +                MerkleTree.serializer.serialize(tree, out, version);
 +            }
 +        }
 +
 +        public MerkleTrees deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            IPartitioner partitioner = null;
 +            int nTrees = in.readInt();
 +            Collection<MerkleTree> trees = new ArrayList<>(nTrees);
 +            if (nTrees > 0)
 +            {
 +                for (int i = 0; i < nTrees; i++)
 +                {
 +                    MerkleTree tree = MerkleTree.serializer.deserialize(in, version);
 +                    trees.add(tree);
 +
 +                    if (partitioner == null)
 +                        partitioner = tree.partitioner();
 +                    else
 +                        assert tree.partitioner() == partitioner;
 +                }
 +            }
 +
 +            return new MerkleTrees(partitioner, trees);
 +        }
 +
 +        public long serializedSize(MerkleTrees trees, int version)
 +        {
 +            assert trees != null;
 +
 +            long size = TypeSizes.sizeof(trees.merkleTrees.size());
 +            for (MerkleTree tree : trees.merkleTrees.values())
 +            {
 +                size += MerkleTree.serializer.serializedSize(tree, version);
 +            }
 +            return size;
 +        }
 +
 +    }
 +
 +    private static class TokenRangeComparator implements Comparator<Range<Token>>
 +    {
 +        @Override
 +        public int compare(Range<Token> rt1, Range<Token> rt2)
 +        {
 +            if (rt1.left.compareTo(rt2.left) == 0)
 +                return 0;
 +
 +            return rt1.compareTo(rt2);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 198b01b,471f8cf..0ce81d3
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@@ -115,10 -125,9 +115,10 @@@ public class CompactionsTes
          return store;
      }
  
-     private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+     public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
      {
          long timestamp = System.currentTimeMillis();
 +        CFMetaData cfm = Keyspace.open(ks).getColumnFamilyStore(cf).metadata;
          for (int i = startRowKey; i <= endRowKey; i++)
          {
              DecoratedKey key = Util.dk(Integer.toString(i));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 14f5707,61ab3da..9c32cef
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@@ -15,13 -15,22 +15,23 @@@
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
- 
  package org.apache.cassandra.repair;
  
 -import java.io.IOException;
  import java.net.InetAddress;
 -import java.security.MessageDigest;
 +import java.util.Arrays;
+ import java.util.Collections;
++import java.util.Iterator;
++import java.util.Map;
  import java.util.UUID;
++import java.util.concurrent.CompletableFuture;
+ import java.util.concurrent.TimeUnit;
+ 
+ import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.SettableFuture;
  
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.compaction.CompactionsTest;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.io.util.SequentialWriter;
  import org.junit.After;
  import org.junit.BeforeClass;
  import org.junit.Test;
@@@ -39,24 -51,26 +49,29 @@@ import org.apache.cassandra.net.IMessag
  import org.apache.cassandra.net.MessageIn;
  import org.apache.cassandra.net.MessageOut;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.net.IMessageSink;
  import org.apache.cassandra.repair.messages.RepairMessage;
  import org.apache.cassandra.repair.messages.ValidationComplete;
 +import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.service.ActiveRepairService;
 -import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.MerkleTree;
 +import org.apache.cassandra.utils.MerkleTrees;
- import org.apache.cassandra.utils.concurrent.SimpleCondition;
++import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.UUIDGen;
 -import org.apache.cassandra.utils.concurrent.SimpleCondition;
  
 -import static org.junit.Assert.*;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertNull;
 +import static org.junit.Assert.assertTrue;
  
  public class ValidatorTest
  {
+     private static final long TEST_TIMEOUT = 60; //seconds
+ 
      private static final String keyspace = "ValidatorTest";
      private static final String columnFamily = "Standard1";
 -    private final IPartitioner partitioner = StorageService.getPartitioner();
 +    private static IPartitioner partitioner;
  
      @BeforeClass
      public static void defineSchema() throws Exception
@@@ -78,36 -92,9 +93,9 @@@
      public void testValidatorComplete() throws Throwable
      {
          Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
 -        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 +        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
  
-         final SimpleCondition lock = new SimpleCondition();
-         MessagingService.instance().addMessageSink(new IMessageSink()
-         {
-             public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
-             {
-                 try
-                 {
-                     if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                     {
-                         RepairMessage m = (RepairMessage) message.payload;
-                         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                         assertEquals(desc, m.desc);
-                         assertTrue(((ValidationComplete) m).success());
-                         assertNotNull(((ValidationComplete) m).trees);
-                     }
-                 }
-                 finally
-                 {
-                     lock.signalAll();
-                 }
-                 return false;
-             }
- 
-             public boolean allowIncomingMessage(MessageIn message, int id)
-             {
-                 return false;
-             }
-         });
 -        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++        final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
  
          InetAddress remote = InetAddress.getByName("127.0.0.2");
  
@@@ -130,37 -116,128 +118,111 @@@
          Token min = tree.partitioner().getMinimumToken();
          assertNotNull(tree.hash(new Range<>(min, min)));
  
-         if (!lock.isSignaled())
-             lock.await();
+         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+         assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+         RepairMessage m = (RepairMessage) message.payload;
+         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+         assertEquals(desc, m.desc);
 -        assertTrue(((ValidationComplete) m).success);
 -        assertNotNull(((ValidationComplete) m).tree);
++        assertTrue(((ValidationComplete) m).success());
++        assertNotNull(((ValidationComplete) m).trees);
      }
  
 -    private static class CompactedRowStub extends AbstractCompactedRow
 -    {
 -        private CompactedRowStub(DecoratedKey key)
 -        {
 -            super(key);
 -        }
 -
 -        public RowIndexEntry write(long currentPosition, SequentialWriter out) throws IOException
 -        {
 -            throw new UnsupportedOperationException();
 -        }
 -
 -        public void update(MessageDigest digest) { }
 -
 -        public ColumnStats columnStats()
 -        {
 -            throw new UnsupportedOperationException();
 -        }
 -
 -        public void close() throws IOException { }
 -    }
  
      @Test
      public void testValidatorFailed() throws Throwable
      {
          Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
 -        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 +        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
  
-         final SimpleCondition lock = new SimpleCondition();
 -        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++        final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+ 
+         InetAddress remote = InetAddress.getByName("127.0.0.2");
+ 
+         Validator validator = new Validator(desc, remote, 0);
+         validator.fail();
+ 
+         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+         assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+         RepairMessage m = (RepairMessage) message.payload;
+         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+         assertEquals(desc, m.desc);
 -        assertFalse(((ValidationComplete) m).success);
 -        assertNull(((ValidationComplete) m).tree);
++        assertFalse(((ValidationComplete) m).success());
++        assertNull(((ValidationComplete) m).trees);
+     }
+ 
+     @Test
+     public void simpleValidationTest128() throws Exception
+     {
+         simpleValidationTest(128);
+     }
+ 
+     @Test
+     public void simpleValidationTest1500() throws Exception
+     {
+         simpleValidationTest(1500);
+     }
+ 
+     /**
+      * Test for CASSANDRA-5263
+      * 1. Create N rows
+      * 2. Run validation compaction
+      * 3. Expect merkle tree with size 2^(log2(n))
+      */
+     public void simpleValidationTest(int n) throws Exception
+     {
+         Keyspace ks = Keyspace.open(keyspace);
+         ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+         cfs.clearUnsafe();
+ 
+         // disable compaction while flushing
+         cfs.disableAutoCompaction();
+ 
+         CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+ 
+         cfs.forceBlockingFlush();
 -        assertEquals(1, cfs.getSSTables().size());
++        assertEquals(1, cfs.getLiveSSTables().size());
+ 
+         // wait enough to force single compaction
+         TimeUnit.SECONDS.sleep(5);
+ 
 -        SSTableReader sstable = cfs.getSSTables().iterator().next();
++        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+         UUID repairSessionId = UUIDGen.getTimeUUID();
+         final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
 -                                               cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
 -                                                                                             sstable.last.getToken()));
++                                               cfs.getColumnFamilyName(), Collections.singletonList(new Range<>(sstable.first.getToken(),
++                                                                                                                sstable.last.getToken())));
+ 
+         ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
 -                                                                 Collections.singletonList(cfs), Collections.singleton(desc.range),
 -                                                                 false, false);
++                                                                 Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
++                                                                 false);
+ 
 -        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++        final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+         Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+         CompactionManager.instance.submitValidation(cfs, validator);
+ 
+         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+         assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+         RepairMessage m = (RepairMessage) message.payload;
+         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+         assertEquals(desc, m.desc);
 -        assertTrue(((ValidationComplete) m).success);
 -        MerkleTree tree = ((ValidationComplete) m).tree;
++        assertTrue(((ValidationComplete) m).success());
++        MerkleTrees trees = ((ValidationComplete) m).trees;
+ 
 -        assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
 -        assertEquals(tree.rowCount(), n);
++        Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = trees.iterator();
++        while (iterator.hasNext())
++        {
++            assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), iterator.next().getValue().size(), 0.0);
++        }
++        assertEquals(trees.rowCount(), n);
+     }
+ 
 -    private ListenableFuture<MessageOut> registerOutgoingMessageSink()
++    private CompletableFuture<MessageOut> registerOutgoingMessageSink()
+     {
 -        final SettableFuture<MessageOut> future = SettableFuture.create();
++        final CompletableFuture<MessageOut> future = new CompletableFuture<>();
          MessagingService.instance().addMessageSink(new IMessageSink()
          {
              public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
              {
-                 try
-                 {
-                     if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                     {
-                         RepairMessage m = (RepairMessage) message.payload;
-                         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                         assertEquals(desc, m.desc);
-                         assertFalse(((ValidationComplete) m).success());
-                         assertNull(((ValidationComplete) m).trees);
-                     }
-                 }
-                 finally
-                 {
-                     lock.signalAll();
-                 }
 -                future.set(message);
++                future.complete(message);
                  return false;
              }
  


[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.X

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.X


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e3b34dc8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e3b34dc8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e3b34dc8

Branch: refs/heads/trunk
Commit: e3b34dc8584373c9f503e830ff3241e0865ab994
Parents: 57e9a83 413e48e
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Sep 29 15:05:12 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 15:05:12 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |   6 +-
 .../org/apache/cassandra/repair/Validator.java  |   9 +-
 .../org/apache/cassandra/utils/MerkleTree.java  |  10 ++
 .../org/apache/cassandra/utils/MerkleTrees.java |  10 ++
 .../db/compaction/CompactionsTest.java          |   2 +-
 .../apache/cassandra/repair/ValidatorTest.java  | 167 ++++++++++++-------
 7 files changed, 145 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c33b1d3,9076e7a..ba08745
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -91,59 -35,12 +91,60 @@@ Merged from 3.0
   * Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
   * Calculate last compacted key on startup (CASSANDRA-6216)
   * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
 + * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
 + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
 +Merged from 2.2:
++ * Fix merkle tree depth calculation (CASSANDRA-12580)
 + * Make Collections deserialization more robust (CASSANDRA-12618)
 + * Fix exceptions when enabling gossip on nodes that haven't joined the ring (CASSANDRA-12253)
 + * Fix authentication problem when invoking clqsh copy from a SOURCE command (CASSANDRA-12642)
 + * Decrement pending range calculator jobs counter in finally block
 + * cqlshlib tests: increase default execute timeout (CASSANDRA-12481)
 + * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)
 + * Fail repair on non-existing table (CASSANDRA-12279)
 + * Enable repair -pr and -local together (fix regression of CASSANDRA-7450) (CASSANDRA-12522)
 +
 +
 +3.8, 3.9
 + * Fix value skipping with counter columns (CASSANDRA-11726)
 + * Fix nodetool tablestats miss SSTable count (CASSANDRA-12205)
 + * Fixed flacky SSTablesIteratedTest (CASSANDRA-12282)
 + * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
 + * cqlsh: Fix handling of $$-escaped strings (CASSANDRA-12189)
 + * Fix SSL JMX requiring truststore containing server cert (CASSANDRA-12109)
 + * RTE from new CDC column breaks in flight queries (CASSANDRA-12236)
 + * Fix hdr logging for single operation workloads (CASSANDRA-12145)
 + * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073)
 + * Increase size of flushExecutor thread pool (CASSANDRA-12071)
 + * Partial revert of CASSANDRA-11971, cannot recycle buffer in SP.sendMessagesToNonlocalDC (CASSANDRA-11950)
 + * Upgrade netty to 4.0.39 (CASSANDRA-12032, CASSANDRA-12034)
 + * Improve details in compaction log message (CASSANDRA-12080)
 + * Allow unset values in CQLSSTableWriter (CASSANDRA-11911)
 + * Chunk cache to request compressor-compatible buffers if pool space is exhausted (CASSANDRA-11993)
 + * Remove DatabaseDescriptor dependencies from SequentialWriter (CASSANDRA-11579)
 + * Move skip_stop_words filter before stemming (CASSANDRA-12078)
 + * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957)
 + * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002)
 + * When SEPWorker assigned work, set thread name to match pool (CASSANDRA-11966)
 + * Add cross-DC latency metrics (CASSANDRA-11596)
 + * Allow terms in selection clause (CASSANDRA-10783)
 + * Add bind variables to trace (CASSANDRA-11719)
 + * Switch counter shards' clock to timestamps (CASSANDRA-9811)
 + * Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853)
 + * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718)
 + * Support older ant versions (CASSANDRA-11807)
 + * Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623)
 + * cassandra-stress profiles should support case sensitive schemas (CASSANDRA-11546)
 + * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578)
 + * Faster streaming (CASSANDRA-9766)
 + * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425)
 + * Add repaired percentage metric (CASSANDRA-11503)
 + * Add Change-Data-Capture (CASSANDRA-8844)
 +Merged from 3.0:
 + * Fix paging for 2.x to 3.x upgrades (CASSANDRA-11195)
   * Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436)
   * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
 - * Backport CASSANDRA-12002 (CASSANDRA-12177)
   * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
 - * Fix potential bad messaging service message for paged range reads
 -   within mixed-version 3.x clusters (CASSANDRA-12249)
   * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
   * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
   * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/utils/MerkleTrees.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------


[04/10] cassandra git commit: Fix merkle tree depth calculation

Posted by yu...@apache.org.
Fix merkle tree depth calculation

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12580


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c70ce630
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c70ce630
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c70ce630

Branch: refs/heads/trunk
Commit: c70ce6307da824529762ff40673642b6f86972aa
Parents: 2383935
Author: Paulo Motta <pa...@gmail.com>
Authored: Tue Aug 30 21:06:39 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:26:53 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../db/compaction/CompactionManager.java        |   4 +-
 .../org/apache/cassandra/repair/Validator.java  |   9 +-
 .../org/apache/cassandra/utils/MerkleTree.java  |  10 ++
 .../db/compaction/CompactionsTest.java          |   2 +-
 .../apache/cassandra/repair/ValidatorTest.java  | 159 ++++++++++++-------
 6 files changed, 125 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 998849e..97bc70a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Fix merkle tree depth calculation (CASSANDRA-12580)
  * Make Collections deserialization more robust (CASSANDRA-12618)
  
  
@@ -36,7 +37,6 @@
  * Don't write shadowed range tombstone (CASSANDRA-12030)
 Merged from 2.1:
  * Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
- * Fix queries with empty ByteBuffer values in clustering column restrictions (CASSANDRA-12127) 
  * Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828)
  * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
  * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index cf82498..78fa23c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1112,8 +1112,8 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
             }
-            // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
-            int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+            // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
+            int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
             MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
 
             long start = System.nanoTime();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 4db1cfb..8dbb4cf 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -54,6 +54,7 @@ public class Validator implements Runnable
     public final RepairJobDesc desc;
     public final InetAddress initiator;
     public final int gcBefore;
+    private final boolean evenTreeDistribution;
 
     // null when all rows with the min token have been consumed
     private long validated;
@@ -67,19 +68,25 @@ public class Validator implements Runnable
 
     public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
     {
+        this(desc, initiator, gcBefore, false);
+    }
+
+    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution)
+    {
         this.desc = desc;
         this.initiator = initiator;
         this.gcBefore = gcBefore;
         validated = 0;
         range = null;
         ranges = null;
+        this.evenTreeDistribution = evenTreeDistribution;
     }
 
     public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
     {
         this.tree = tree;
 
-        if (!tree.partitioner().preservesOrder())
+        if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
         {
             // You can't beat an even tree distribution for md5
             tree.init();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 4fec62d..1e0f505 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -516,6 +516,16 @@ public class MerkleTree implements Serializable
         return histbuild.buildWithStdevRangesAroundMean();
     }
 
+    public long rowCount()
+    {
+        long count = 0;
+        for (TreeRange range : new TreeRangeIterator(this))
+        {
+            count += range.hashable.rowsInRange;
+        }
+        return count;
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 8ff3022..471f8cf 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -125,7 +125,7 @@ public class CompactionsTest
         return store;
     }
 
-    private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+    public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
     {
         long timestamp = System.currentTimeMillis();
         for (int i = startRowKey; i <= endRowKey; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index a9f18f5..61ab3da 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -20,8 +20,16 @@ package org.apache.cassandra.repair;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.security.MessageDigest;
+import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.CompactionsTest;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.junit.After;
 import org.junit.BeforeClass;
@@ -46,15 +54,20 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.IMessageSink;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 import static org.junit.Assert.*;
 
 public class ValidatorTest
 {
+    private static final long TEST_TIMEOUT = 60; //seconds
+
     private static final String keyspace = "ValidatorTest";
     private static final String columnFamily = "Standard1";
     private final IPartitioner partitioner = StorageService.getPartitioner();
@@ -81,35 +94,7 @@ public class ValidatorTest
         Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
         final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 
-        final SimpleCondition lock = new SimpleCondition();
-        MessagingService.instance().addMessageSink(new IMessageSink()
-        {
-            @SuppressWarnings("unchecked")
-            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
-            {
-                try
-                {
-                    if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                    {
-                        RepairMessage m = (RepairMessage) message.payload;
-                        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                        assertEquals(desc, m.desc);
-                        assertTrue(((ValidationComplete) m).success);
-                        assertNotNull(((ValidationComplete) m).tree);
-                    }
-                }
-                finally
-                {
-                    lock.signalAll();
-                }
-                return false;
-            }
-
-            public boolean allowIncomingMessage(MessageIn message, int id)
-            {
-                return false;
-            }
-        });
+        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
 
         InetAddress remote = InetAddress.getByName("127.0.0.2");
 
@@ -131,8 +116,13 @@ public class ValidatorTest
         Token min = tree.partitioner().getMinimumToken();
         assertNotNull(tree.hash(new Range<>(min, min)));
 
-        if (!lock.isSignaled())
-            lock.await();
+        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+        RepairMessage m = (RepairMessage) message.payload;
+        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(desc, m.desc);
+        assertTrue(((ValidationComplete) m).success);
+        assertNotNull(((ValidationComplete) m).tree);
     }
 
     private static class CompactedRowStub extends AbstractCompactedRow
@@ -163,27 +153,91 @@ public class ValidatorTest
         Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
         final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 
-        final SimpleCondition lock = new SimpleCondition();
+        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+
+        InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+        Validator validator = new Validator(desc, remote, 0);
+        validator.fail();
+
+        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+        RepairMessage m = (RepairMessage) message.payload;
+        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(desc, m.desc);
+        assertFalse(((ValidationComplete) m).success);
+        assertNull(((ValidationComplete) m).tree);
+    }
+
+    @Test
+    public void simpleValidationTest128() throws Exception
+    {
+        simpleValidationTest(128);
+    }
+
+    @Test
+    public void simpleValidationTest1500() throws Exception
+    {
+        simpleValidationTest(1500);
+    }
+
+    /**
+     * Test for CASSANDRA-5263
+     * 1. Create N rows
+     * 2. Run validation compaction
+     * 3. Expect merkle tree with size 2^(log2(n))
+     */
+    public void simpleValidationTest(int n) throws Exception
+    {
+        Keyspace ks = Keyspace.open(keyspace);
+        ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+        cfs.clearUnsafe();
+
+        // disable compaction while flushing
+        cfs.disableAutoCompaction();
+
+        CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+
+        cfs.forceBlockingFlush();
+        assertEquals(1, cfs.getSSTables().size());
+
+        // wait enough to force single compaction
+        TimeUnit.SECONDS.sleep(5);
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+        UUID repairSessionId = UUIDGen.getTimeUUID();
+        final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
+                                               cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
+                                                                                             sstable.last.getToken()));
+
+        ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
+                                                                 Collections.singletonList(cfs), Collections.singleton(desc.range),
+                                                                 false, false);
+
+        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+        CompactionManager.instance.submitValidation(cfs, validator);
+
+        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+        RepairMessage m = (RepairMessage) message.payload;
+        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(desc, m.desc);
+        assertTrue(((ValidationComplete) m).success);
+        MerkleTree tree = ((ValidationComplete) m).tree;
+
+        assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
+        assertEquals(tree.rowCount(), n);
+    }
+
+    private ListenableFuture<MessageOut> registerOutgoingMessageSink()
+    {
+        final SettableFuture<MessageOut> future = SettableFuture.create();
         MessagingService.instance().addMessageSink(new IMessageSink()
         {
-            @SuppressWarnings("unchecked")
             public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
             {
-                try
-                {
-                    if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                    {
-                        RepairMessage m = (RepairMessage) message.payload;
-                        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                        assertEquals(desc, m.desc);
-                        assertFalse(((ValidationComplete) m).success);
-                        assertNull(((ValidationComplete) m).tree);
-                    }
-                }
-                finally
-                {
-                    lock.signalAll();
-                }
+                future.set(message);
                 return false;
             }
 
@@ -192,13 +246,6 @@ public class ValidatorTest
                 return false;
             }
         });
-
-        InetAddress remote = InetAddress.getByName("127.0.0.2");
-
-        Validator validator = new Validator(desc, remote, 0);
-        validator.fail();
-
-        if (!lock.isSignaled())
-            lock.await();
+        return future;
     }
 }


[03/10] cassandra git commit: Fix merkle tree depth calculation

Posted by yu...@apache.org.
Fix merkle tree depth calculation

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12580


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c70ce630
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c70ce630
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c70ce630

Branch: refs/heads/cassandra-3.X
Commit: c70ce6307da824529762ff40673642b6f86972aa
Parents: 2383935
Author: Paulo Motta <pa...@gmail.com>
Authored: Tue Aug 30 21:06:39 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:26:53 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../db/compaction/CompactionManager.java        |   4 +-
 .../org/apache/cassandra/repair/Validator.java  |   9 +-
 .../org/apache/cassandra/utils/MerkleTree.java  |  10 ++
 .../db/compaction/CompactionsTest.java          |   2 +-
 .../apache/cassandra/repair/ValidatorTest.java  | 159 ++++++++++++-------
 6 files changed, 125 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 998849e..97bc70a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Fix merkle tree depth calculation (CASSANDRA-12580)
  * Make Collections deserialization more robust (CASSANDRA-12618)
  
  
@@ -36,7 +37,6 @@
  * Don't write shadowed range tombstone (CASSANDRA-12030)
 Merged from 2.1:
  * Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
- * Fix queries with empty ByteBuffer values in clustering column restrictions (CASSANDRA-12127) 
  * Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828)
  * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
  * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index cf82498..78fa23c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1112,8 +1112,8 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
             }
-            // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
-            int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+            // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
+            int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
             MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
 
             long start = System.nanoTime();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 4db1cfb..8dbb4cf 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -54,6 +54,7 @@ public class Validator implements Runnable
     public final RepairJobDesc desc;
     public final InetAddress initiator;
     public final int gcBefore;
+    private final boolean evenTreeDistribution;
 
     // null when all rows with the min token have been consumed
     private long validated;
@@ -67,19 +68,25 @@ public class Validator implements Runnable
 
     public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
     {
+        this(desc, initiator, gcBefore, false);
+    }
+
+    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution)
+    {
         this.desc = desc;
         this.initiator = initiator;
         this.gcBefore = gcBefore;
         validated = 0;
         range = null;
         ranges = null;
+        this.evenTreeDistribution = evenTreeDistribution;
     }
 
     public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
     {
         this.tree = tree;
 
-        if (!tree.partitioner().preservesOrder())
+        if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
         {
             // You can't beat an even tree distribution for md5
             tree.init();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 4fec62d..1e0f505 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -516,6 +516,16 @@ public class MerkleTree implements Serializable
         return histbuild.buildWithStdevRangesAroundMean();
     }
 
+    public long rowCount()
+    {
+        long count = 0;
+        for (TreeRange range : new TreeRangeIterator(this))
+        {
+            count += range.hashable.rowsInRange;
+        }
+        return count;
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 8ff3022..471f8cf 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -125,7 +125,7 @@ public class CompactionsTest
         return store;
     }
 
-    private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+    public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
     {
         long timestamp = System.currentTimeMillis();
         for (int i = startRowKey; i <= endRowKey; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index a9f18f5..61ab3da 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -20,8 +20,16 @@ package org.apache.cassandra.repair;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.security.MessageDigest;
+import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.CompactionsTest;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.junit.After;
 import org.junit.BeforeClass;
@@ -46,15 +54,20 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.IMessageSink;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 import static org.junit.Assert.*;
 
 public class ValidatorTest
 {
+    private static final long TEST_TIMEOUT = 60; //seconds
+
     private static final String keyspace = "ValidatorTest";
     private static final String columnFamily = "Standard1";
     private final IPartitioner partitioner = StorageService.getPartitioner();
@@ -81,35 +94,7 @@ public class ValidatorTest
         Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
         final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 
-        final SimpleCondition lock = new SimpleCondition();
-        MessagingService.instance().addMessageSink(new IMessageSink()
-        {
-            @SuppressWarnings("unchecked")
-            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
-            {
-                try
-                {
-                    if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                    {
-                        RepairMessage m = (RepairMessage) message.payload;
-                        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                        assertEquals(desc, m.desc);
-                        assertTrue(((ValidationComplete) m).success);
-                        assertNotNull(((ValidationComplete) m).tree);
-                    }
-                }
-                finally
-                {
-                    lock.signalAll();
-                }
-                return false;
-            }
-
-            public boolean allowIncomingMessage(MessageIn message, int id)
-            {
-                return false;
-            }
-        });
+        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
 
         InetAddress remote = InetAddress.getByName("127.0.0.2");
 
@@ -131,8 +116,13 @@ public class ValidatorTest
         Token min = tree.partitioner().getMinimumToken();
         assertNotNull(tree.hash(new Range<>(min, min)));
 
-        if (!lock.isSignaled())
-            lock.await();
+        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+        RepairMessage m = (RepairMessage) message.payload;
+        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(desc, m.desc);
+        assertTrue(((ValidationComplete) m).success);
+        assertNotNull(((ValidationComplete) m).tree);
     }
 
     private static class CompactedRowStub extends AbstractCompactedRow
@@ -163,27 +153,91 @@ public class ValidatorTest
         Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
         final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 
-        final SimpleCondition lock = new SimpleCondition();
+        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+
+        InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+        Validator validator = new Validator(desc, remote, 0);
+        validator.fail();
+
+        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+        RepairMessage m = (RepairMessage) message.payload;
+        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(desc, m.desc);
+        assertFalse(((ValidationComplete) m).success);
+        assertNull(((ValidationComplete) m).tree);
+    }
+
+    @Test
+    public void simpleValidationTest128() throws Exception
+    {
+        simpleValidationTest(128);
+    }
+
+    @Test
+    public void simpleValidationTest1500() throws Exception
+    {
+        simpleValidationTest(1500);
+    }
+
+    /**
+     * Test for CASSANDRA-5263
+     * 1. Create N rows
+     * 2. Run validation compaction
+     * 3. Expect merkle tree with size 2^(log2(n))
+     */
+    public void simpleValidationTest(int n) throws Exception
+    {
+        Keyspace ks = Keyspace.open(keyspace);
+        ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+        cfs.clearUnsafe();
+
+        // disable compaction while flushing
+        cfs.disableAutoCompaction();
+
+        CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+
+        cfs.forceBlockingFlush();
+        assertEquals(1, cfs.getSSTables().size());
+
+        // wait enough to force single compaction
+        TimeUnit.SECONDS.sleep(5);
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+        UUID repairSessionId = UUIDGen.getTimeUUID();
+        final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
+                                               cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
+                                                                                             sstable.last.getToken()));
+
+        ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
+                                                                 Collections.singletonList(cfs), Collections.singleton(desc.range),
+                                                                 false, false);
+
+        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+        CompactionManager.instance.submitValidation(cfs, validator);
+
+        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+        assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+        RepairMessage m = (RepairMessage) message.payload;
+        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(desc, m.desc);
+        assertTrue(((ValidationComplete) m).success);
+        MerkleTree tree = ((ValidationComplete) m).tree;
+
+        assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
+        assertEquals(tree.rowCount(), n);
+    }
+
+    private ListenableFuture<MessageOut> registerOutgoingMessageSink()
+    {
+        final SettableFuture<MessageOut> future = SettableFuture.create();
         MessagingService.instance().addMessageSink(new IMessageSink()
         {
-            @SuppressWarnings("unchecked")
             public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
             {
-                try
-                {
-                    if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                    {
-                        RepairMessage m = (RepairMessage) message.payload;
-                        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                        assertEquals(desc, m.desc);
-                        assertFalse(((ValidationComplete) m).success);
-                        assertNull(((ValidationComplete) m).tree);
-                    }
-                }
-                finally
-                {
-                    lock.signalAll();
-                }
+                future.set(message);
                 return false;
             }
 
@@ -192,13 +246,6 @@ public class ValidatorTest
                 return false;
             }
         });
-
-        InetAddress remote = InetAddress.getByName("127.0.0.2");
-
-        Validator validator = new Validator(desc, remote, 0);
-        validator.fail();
-
-        if (!lock.isSignaled())
-            lock.await();
+        return future;
     }
 }


[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.X

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.X


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e3b34dc8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e3b34dc8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e3b34dc8

Branch: refs/heads/cassandra-3.X
Commit: e3b34dc8584373c9f503e830ff3241e0865ab994
Parents: 57e9a83 413e48e
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Sep 29 15:05:12 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 15:05:12 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |   6 +-
 .../org/apache/cassandra/repair/Validator.java  |   9 +-
 .../org/apache/cassandra/utils/MerkleTree.java  |  10 ++
 .../org/apache/cassandra/utils/MerkleTrees.java |  10 ++
 .../db/compaction/CompactionsTest.java          |   2 +-
 .../apache/cassandra/repair/ValidatorTest.java  | 167 ++++++++++++-------
 7 files changed, 145 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c33b1d3,9076e7a..ba08745
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -91,59 -35,12 +91,60 @@@ Merged from 3.0
   * Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
   * Calculate last compacted key on startup (CASSANDRA-6216)
   * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
 + * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
 + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
 +Merged from 2.2:
++ * Fix merkle tree depth calculation (CASSANDRA-12580)
 + * Make Collections deserialization more robust (CASSANDRA-12618)
 + * Fix exceptions when enabling gossip on nodes that haven't joined the ring (CASSANDRA-12253)
 + * Fix authentication problem when invoking clqsh copy from a SOURCE command (CASSANDRA-12642)
 + * Decrement pending range calculator jobs counter in finally block
 + * cqlshlib tests: increase default execute timeout (CASSANDRA-12481)
 + * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)
 + * Fail repair on non-existing table (CASSANDRA-12279)
 + * Enable repair -pr and -local together (fix regression of CASSANDRA-7450) (CASSANDRA-12522)
 +
 +
 +3.8, 3.9
 + * Fix value skipping with counter columns (CASSANDRA-11726)
 + * Fix nodetool tablestats miss SSTable count (CASSANDRA-12205)
 + * Fixed flacky SSTablesIteratedTest (CASSANDRA-12282)
 + * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
 + * cqlsh: Fix handling of $$-escaped strings (CASSANDRA-12189)
 + * Fix SSL JMX requiring truststore containing server cert (CASSANDRA-12109)
 + * RTE from new CDC column breaks in flight queries (CASSANDRA-12236)
 + * Fix hdr logging for single operation workloads (CASSANDRA-12145)
 + * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073)
 + * Increase size of flushExecutor thread pool (CASSANDRA-12071)
 + * Partial revert of CASSANDRA-11971, cannot recycle buffer in SP.sendMessagesToNonlocalDC (CASSANDRA-11950)
 + * Upgrade netty to 4.0.39 (CASSANDRA-12032, CASSANDRA-12034)
 + * Improve details in compaction log message (CASSANDRA-12080)
 + * Allow unset values in CQLSSTableWriter (CASSANDRA-11911)
 + * Chunk cache to request compressor-compatible buffers if pool space is exhausted (CASSANDRA-11993)
 + * Remove DatabaseDescriptor dependencies from SequentialWriter (CASSANDRA-11579)
 + * Move skip_stop_words filter before stemming (CASSANDRA-12078)
 + * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957)
 + * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002)
 + * When SEPWorker assigned work, set thread name to match pool (CASSANDRA-11966)
 + * Add cross-DC latency metrics (CASSANDRA-11596)
 + * Allow terms in selection clause (CASSANDRA-10783)
 + * Add bind variables to trace (CASSANDRA-11719)
 + * Switch counter shards' clock to timestamps (CASSANDRA-9811)
 + * Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853)
 + * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718)
 + * Support older ant versions (CASSANDRA-11807)
 + * Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623)
 + * cassandra-stress profiles should support case sensitive schemas (CASSANDRA-11546)
 + * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578)
 + * Faster streaming (CASSANDRA-9766)
 + * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425)
 + * Add repaired percentage metric (CASSANDRA-11503)
 + * Add Change-Data-Capture (CASSANDRA-8844)
 +Merged from 3.0:
 + * Fix paging for 2.x to 3.x upgrades (CASSANDRA-11195)
   * Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436)
   * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
 - * Backport CASSANDRA-12002 (CASSANDRA-12177)
   * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
 - * Fix potential bad messaging service message for paged range reads
 -   within mixed-version 3.x clusters (CASSANDRA-12249)
   * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
   * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
   * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/utils/MerkleTrees.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------


[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/413e48e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/413e48e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/413e48e6

Branch: refs/heads/cassandra-3.X
Commit: 413e48e6571e3c23362d5053e0c7fcdd99bd1e7d
Parents: 5cebd1f c70ce63
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Sep 29 14:32:49 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:32:49 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 .../db/compaction/CompactionManager.java        |   6 +-
 .../org/apache/cassandra/repair/Validator.java  |   9 +-
 .../org/apache/cassandra/utils/MerkleTree.java  |  10 ++
 .../org/apache/cassandra/utils/MerkleTrees.java |  10 ++
 .../db/compaction/CompactionsTest.java          |   2 +-
 .../apache/cassandra/repair/ValidatorTest.java  | 167 ++++++++++++-------
 7 files changed, 146 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f2f8dac,97bc70a..9076e7a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,77 -1,13 +1,78 @@@
 -2.2.9
 +3.0.10
 + * Fix failure in LogTransactionTest (CASSANDRA-12632)
 + * Fix potentially incomplete non-frozen UDT values when querying with the
 +   full primary key specified (CASSANDRA-12605)
 + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
 + * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
 + * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
 + * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
 + * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
 + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
 +Merged from 2.2:
+  * Fix merkle tree depth calculation (CASSANDRA-12580)
+  * Make Collections deserialization more robust (CASSANDRA-12618)
 - 
 - 
 -2.2.8
   * Fix exceptions when enabling gossip on nodes that haven't joined the ring (CASSANDRA-12253)
   * Fix authentication problem when invoking clqsh copy from a SOURCE command (CASSANDRA-12642)
   * Decrement pending range calculator jobs counter in finally block
    (CASSANDRA-12554)
 +Merged from 2.1:
-  * Make Collections deserialization more robust (CASSANDRA-12618)
 + * Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
 +
 +
 +3.0.9
 + * Handle composite prefixes with final EOC=0 as in 2.x and refactor LegacyLayout.decodeBound (CASSANDRA-12423)
 + * Fix paging for 2.x to 3.x upgrades (CASSANDRA-11195)
 + * select_distinct_with_deletions_test failing on non-vnode environments (CASSANDRA-11126)
 + * Stack Overflow returned to queries while upgrading (CASSANDRA-12527)
 + * Fix legacy regex for temporary files from 2.2 (CASSANDRA-12565)
 + * Add option to state current gc_grace_seconds to tools/bin/sstablemetadata (CASSANDRA-12208)
 + * Fix file system race condition that may cause LogAwareFileLister to fail to classify files (CASSANDRA-11889)
 + * Fix file handle leaks due to simultaneous compaction/repair and
 +   listing snapshots, calculating snapshot sizes, or making schema
 +   changes (CASSANDRA-11594)
 + * Fix nodetool repair exits with 0 for some errors (CASSANDRA-12508)
 + * Do not shut down BatchlogManager twice during drain (CASSANDRA-12504)
 + * Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
 + * Calculate last compacted key on startup (CASSANDRA-6216)
 + * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
 + * Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436)
 + * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
 + * Backport CASSANDRA-12002 (CASSANDRA-12177)
 + * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
 + * Fix potential bad messaging service message for paged range reads
 +   within mixed-version 3.x clusters (CASSANDRA-12249)
 + * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
 + * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
 + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
 + * Fix upgrade of super columns on thrift (CASSANDRA-12335)
 + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
 + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
 + * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
 + * Lost counter writes in compact table and static columns (CASSANDRA-12219)
 + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
 + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
 + * Add option to override compaction space check (CASSANDRA-12180)
 + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
 + * Respond with v1/v2 protocol header when responding to driver that attempts
 +   to connect with too low of a protocol version (CASSANDRA-11464)
 + * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
 + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
 + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
 + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
 + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
 + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
 + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 + * Fix JsonTransformer output of partition with deletion info (CASSANDRA-12418)
 + * Fix NPE in SSTableLoader when specifying partial directory path (CASSANDRA-12609)
 +Merged from 2.2:
   * Add local address entry in PropertyFileSnitch (CASSANDRA-11332)
   * cqlshlib tests: increase default execute timeout (CASSANDRA-12481)
   * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 99e0fd5,78fa23c..4d1757e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1094,34 -1102,40 +1094,33 @@@ public class CompactionManager implemen
                  if (validator.gcBefore > 0)
                      gcBefore = validator.gcBefore;
                  else
 -                    gcBefore = getDefaultGcBefore(cfs);
 +                    gcBefore = getDefaultGcBefore(cfs, nowInSec);
              }
  
 -            // Create Merkle tree suitable to hold estimated partitions for given range.
 -            // We blindly assume that partition is evenly distributed on all sstables for now.
 -            long numPartitions = 0;
 -            for (SSTableReader sstable : sstables)
 -            {
 -                numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
 -            }
 -            // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
 -            int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
 -            MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
 -
 +            // Create Merkle trees suitable to hold estimated partitions for the given ranges.
 +            // We blindly assume that a partition is evenly distributed on all sstables for now.
-             // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
 +            MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs);
              long start = System.nanoTime();
 -            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
 +            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
 +                 ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore);
 +                 CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics))
              {
 -                CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
 -                Iterator<AbstractCompactedRow> iter = ci.iterator();
 -                metrics.beginCompaction(ci);
 -                try
 +                // validate the CF as we iterate over it
 +                validator.prepare(cfs, tree);
 +                while (ci.hasNext())
                  {
 -                    // validate the CF as we iterate over it
 -                    validator.prepare(cfs, tree);
 -                    while (iter.hasNext())
 +                    if (ci.isStopRequested())
 +                        throw new CompactionInterruptedException(ci.getCompactionInfo());
 +                    try (UnfilteredRowIterator partition = ci.next())
                      {
 -                        if (ci.isStopRequested())
 -                            throw new CompactionInterruptedException(ci.getCompactionInfo());
 -                        AbstractCompactedRow row = iter.next();
 -                        validator.add(row);
 +                        validator.add(partition);
                      }
 -                    validator.complete();
                  }
 -                finally
 +                validator.complete();
 +            }
 +            finally
 +            {
 +                if (isSnapshotValidation && !isGlobalSnapshotValidation)
                  {
                      // we can only clear the snapshot if we are not doing a global snapshot validation (we then clear it once anticompaction
                      // is done).
@@@ -1144,37 -1167,6 +1143,40 @@@
          }
      }
  
 +    private static MerkleTrees createMerkleTrees(Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, ColumnFamilyStore cfs)
 +    {
 +        MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
 +        long allPartitions = 0;
 +        Map<Range<Token>, Long> rangePartitionCounts = new HashMap<>();
 +        for (Range<Token> range : ranges)
 +        {
 +            long numPartitions = 0;
 +            for (SSTableReader sstable : sstables)
 +                numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(range));
 +            rangePartitionCounts.put(range, numPartitions);
 +            allPartitions += numPartitions;
 +        }
 +
 +        for (Range<Token> range : ranges)
 +        {
 +            long numPartitions = rangePartitionCounts.get(range);
 +            double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0;
++            // determine max tree depth proportional to range size to avoid blowing up memory with multiple tress,
++            // capping at 20 to prevent large tree (CASSANDRA-11390)
 +            int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0;
-             int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), maxDepth) : 0;
++            // determine tree depth from number of partitions, capping at max tree depth (CASSANDRA-5263)
++            int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0;
 +            tree.addMerkleTree((int) Math.pow(2, depth), range);
 +        }
 +        if (logger.isDebugEnabled())
 +        {
 +            // MT serialize may take time
 +            logger.debug("Created {} merkle trees with merkle trees size {}, {} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, MerkleTrees.serializer.serializedSize(tree, 0));
 +        }
 +
 +        return tree;
 +    }
 +
      private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
      {
          Refs<SSTableReader> sstables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/Validator.java
index 217c9de,8dbb4cf..9baa358
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@@ -77,13 -79,14 +83,14 @@@ public class Validator implements Runna
          validated = 0;
          range = null;
          ranges = null;
+         this.evenTreeDistribution = evenTreeDistribution;
      }
  
 -    public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
 +    public void prepare(ColumnFamilyStore cfs, MerkleTrees tree)
      {
 -        this.tree = tree;
 +        this.trees = tree;
  
-         if (!tree.partitioner().preservesOrder())
+         if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
          {
              // You can't beat an even tree distribution for md5
              tree.init();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTrees.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/MerkleTrees.java
index b950b3b,0000000..4ae55ab
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/utils/MerkleTrees.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTrees.java
@@@ -1,436 -1,0 +1,446 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.utils;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.IOException;
 +import java.util.*;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.AbstractIterator;
 +import com.google.common.collect.PeekingIterator;
 +import org.slf4j.Logger;
 +
 +import org.apache.cassandra.db.TypeSizes;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +
 +
 +/**
 + * Wrapper class for handling of multiple MerkleTrees at once.
 + * 
 + * The MerkleTree's are divided in Ranges of non-overlapping tokens.
 + */
 +public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>>
 +{
 +    public static final MerkleTreesSerializer serializer = new MerkleTreesSerializer();
 +
 +    private Map<Range<Token>, MerkleTree> merkleTrees = new TreeMap<>(new TokenRangeComparator());
 +
 +    private IPartitioner partitioner;
 +
 +    /**
 +     * Creates empty MerkleTrees object.
 +     * 
 +     * @param partitioner The partitioner to use
 +     */
 +    public MerkleTrees(IPartitioner partitioner)
 +    {
 +        this(partitioner, new ArrayList<>());
 +    }
 +
 +    private MerkleTrees(IPartitioner partitioner, Collection<MerkleTree> merkleTrees)
 +    {
 +        this.partitioner = partitioner;
 +        addTrees(merkleTrees);
 +    }
 +
 +    /**
 +     * Get the ranges that these merkle trees covers.
 +     * 
 +     * @return
 +     */
 +    public Collection<Range<Token>> ranges()
 +    {
 +        return merkleTrees.keySet();
 +    }
 +
 +    /**
 +     * Get the partitioner in use.
 +     * 
 +     * @return
 +     */
 +    public IPartitioner partitioner()
 +    {
 +        return partitioner;
 +    }
 +
 +    /**
 +     * Add merkle tree's with the defined maxsize and ranges.
 +     * 
 +     * @param maxsize
 +     * @param ranges
 +     */
 +    public void addMerkleTrees(int maxsize, Collection<Range<Token>> ranges)
 +    {
 +        for (Range<Token> range : ranges)
 +        {
 +            addMerkleTree(maxsize, range);
 +        }
 +    }
 +
 +    /**
 +     * Add a MerkleTree with the defined size and range.
 +     * 
 +     * @param maxsize
 +     * @param range
 +     * @return The created merkle tree.
 +     */
 +    public MerkleTree addMerkleTree(int maxsize, Range<Token> range)
 +    {
 +        return addMerkleTree(maxsize, MerkleTree.RECOMMENDED_DEPTH, range);
 +    }
 +
 +    @VisibleForTesting
 +    public MerkleTree addMerkleTree(int maxsize, byte hashdepth, Range<Token> range)
 +    {
 +        MerkleTree tree = new MerkleTree(partitioner, range, hashdepth, maxsize);
 +        addTree(tree);
 +
 +        return tree;
 +    }
 +
 +    /**
 +     * Get the MerkleTree.Range responsible for the given token.
 +     * 
 +     * @param t
 +     * @return
 +     */
 +    @VisibleForTesting
 +    public MerkleTree.TreeRange get(Token t)
 +    {
 +        return getMerkleTree(t).get(t);
 +    }
 +
 +    /**
 +     * Init all MerkleTree's with an even tree distribution.
 +     */
 +    public void init()
 +    {
 +        for (Range<Token> range : merkleTrees.keySet())
 +        {
 +            init(range);
 +        }
 +    }
 +
 +    /**
 +     * Init a selected MerkleTree with an even tree distribution.
 +     * 
 +     * @param range
 +     */
 +    public void init(Range<Token> range)
 +    {
 +        merkleTrees.get(range).init();
 +    }
 +
 +    /**
 +     * Split the MerkleTree responsible for the given token.
 +     * 
 +     * @param t
 +     * @return
 +     */
 +    public boolean split(Token t)
 +    {
 +        return getMerkleTree(t).split(t);
 +    }
 +
 +    /**
 +     * Invalidate the MerkleTree responsible for the given token.
 +     * 
 +     * @param t
 +     */
 +    @VisibleForTesting
 +    public void invalidate(Token t)
 +    {
 +        getMerkleTree(t).invalidate(t);
 +    }
 +
 +    /**
 +     * Get the MerkleTree responsible for the given token range.
 +     * 
 +     * @param range
 +     * @return
 +     */
 +    public MerkleTree getMerkleTree(Range<Token> range)
 +    {
 +        return merkleTrees.get(range);
 +    }
 +
 +    public long size()
 +    {
 +        long size = 0;
 +
 +        for (MerkleTree tree : merkleTrees.values())
 +        {
 +            size += tree.size();
 +        }
 +
 +        return size;
 +    }
 +
 +    @VisibleForTesting
 +    public void maxsize(Range<Token> range, int maxsize)
 +    {
 +        getMerkleTree(range).maxsize(maxsize);
 +    }
 +
 +    /**
 +     * Get the MerkleTree responsible for the given token.
 +     * 
 +     * @param t
 +     * @return The given MerkleTree or null if none exist.
 +     */
 +    private MerkleTree getMerkleTree(Token t)
 +    {
 +        for (Range<Token> range : merkleTrees.keySet())
 +        {
 +            if (range.contains(t))
 +                return merkleTrees.get(range);
 +        }
 +
 +        throw new AssertionError("Expected tree for token " + t);
 +    }
 +
 +    private void addTrees(Collection<MerkleTree> trees)
 +    {
 +        for (MerkleTree tree : trees)
 +        {
 +            addTree(tree);
 +        }
 +    }
 +
 +    private void addTree(MerkleTree tree)
 +    {
 +        assert validateNonOverlapping(tree) : "Range [" + tree.fullRange + "] is intersecting an existing range";
 +
 +        merkleTrees.put(tree.fullRange, tree);
 +    }
 +
 +    private boolean validateNonOverlapping(MerkleTree tree)
 +    {
 +        for (Range<Token> range : merkleTrees.keySet())
 +        {
 +            if (tree.fullRange.intersects(range))
 +                return false;
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Get an iterator for all the invalids generated by the MerkleTrees.
 +     * 
 +     * @return
 +     */
 +    public TreeRangeIterator invalids()
 +    {
 +        return new TreeRangeIterator();
 +    }
 +
 +    /**
 +     * Log the row count per leaf for all MerkleTrees.
 +     * 
 +     * @param logger
 +     */
 +    public void logRowCountPerLeaf(Logger logger)
 +    {
 +        for (MerkleTree tree : merkleTrees.values())
 +        {
 +            tree.histogramOfRowCountPerLeaf().log(logger);
 +        }
 +    }
 +
 +    /**
 +     * Log the row size per leaf for all MerkleTrees.
 +     * 
 +     * @param logger
 +     */
 +    public void logRowSizePerLeaf(Logger logger)
 +    {
 +        for (MerkleTree tree : merkleTrees.values())
 +        {
 +            tree.histogramOfRowSizePerLeaf().log(logger);
 +        }
 +    }
 +
 +    @VisibleForTesting
 +    public byte[] hash(Range<Token> range)
 +    {
 +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +        boolean hashed = false;
 +
 +        try
 +        {
 +            for (Range<Token> rt : merkleTrees.keySet())
 +            {
 +                if (rt.intersects(range))
 +                {
 +                    byte[] bytes = merkleTrees.get(rt).hash(range);
 +                    if (bytes != null)
 +                    {
 +                        baos.write(bytes);
 +                        hashed = true;
 +                    }
 +                }
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException("Unable to append merkle tree hash to result");
 +        }
 +        
 +        return hashed ? baos.toByteArray() : null;
 +    }
 +
 +    /**
 +     * Get an iterator of all ranges and their MerkleTrees.
 +     */
 +    public Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator()
 +    {
 +        return merkleTrees.entrySet().iterator();
 +    }
 +
++    public long rowCount()
++    {
++        long totalCount = 0;
++        for (MerkleTree tree : merkleTrees.values())
++        {
++            totalCount += tree.rowCount();
++        }
++        return totalCount;
++    }
++
 +    public class TreeRangeIterator extends AbstractIterator<MerkleTree.TreeRange> implements
 +            Iterable<MerkleTree.TreeRange>,
 +            PeekingIterator<MerkleTree.TreeRange>
 +    {
 +        private final Iterator<MerkleTree> it;
 +
 +        private MerkleTree.TreeRangeIterator current = null;
 +
 +        private TreeRangeIterator()
 +        {
 +            it = merkleTrees.values().iterator();
 +        }
 +
 +        public MerkleTree.TreeRange computeNext()
 +        {
 +            if (current == null || !current.hasNext())
 +                return nextIterator();
 +
 +            return current.next();
 +        }
 +
 +        private MerkleTree.TreeRange nextIterator()
 +        {
 +            if (it.hasNext())
 +            {
 +                current = it.next().invalids();
 +
 +                return current.next();
 +            }
 +
 +            return endOfData();
 +        }
 +
 +        public Iterator<MerkleTree.TreeRange> iterator()
 +        {
 +            return this;
 +        }
 +    }
 +
 +    /**
 +     * Get the differences between the two sets of MerkleTrees.
 +     * 
 +     * @param ltree
 +     * @param rtree
 +     * @return
 +     */
 +    public static List<Range<Token>> difference(MerkleTrees ltree, MerkleTrees rtree)
 +    {
 +        List<Range<Token>> differences = new ArrayList<>();
 +        for (MerkleTree tree : ltree.merkleTrees.values())
 +        {
 +            differences.addAll(MerkleTree.difference(tree, rtree.getMerkleTree(tree.fullRange)));
 +        }
 +        return differences;
 +    }
 +
 +    public static class MerkleTreesSerializer implements IVersionedSerializer<MerkleTrees>
 +    {
 +        public void serialize(MerkleTrees trees, DataOutputPlus out, int version) throws IOException
 +        {
 +            out.writeInt(trees.merkleTrees.size());
 +            for (MerkleTree tree : trees.merkleTrees.values())
 +            {
 +                MerkleTree.serializer.serialize(tree, out, version);
 +            }
 +        }
 +
 +        public MerkleTrees deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            IPartitioner partitioner = null;
 +            int nTrees = in.readInt();
 +            Collection<MerkleTree> trees = new ArrayList<>(nTrees);
 +            if (nTrees > 0)
 +            {
 +                for (int i = 0; i < nTrees; i++)
 +                {
 +                    MerkleTree tree = MerkleTree.serializer.deserialize(in, version);
 +                    trees.add(tree);
 +
 +                    if (partitioner == null)
 +                        partitioner = tree.partitioner();
 +                    else
 +                        assert tree.partitioner() == partitioner;
 +                }
 +            }
 +
 +            return new MerkleTrees(partitioner, trees);
 +        }
 +
 +        public long serializedSize(MerkleTrees trees, int version)
 +        {
 +            assert trees != null;
 +
 +            long size = TypeSizes.sizeof(trees.merkleTrees.size());
 +            for (MerkleTree tree : trees.merkleTrees.values())
 +            {
 +                size += MerkleTree.serializer.serializedSize(tree, version);
 +            }
 +            return size;
 +        }
 +
 +    }
 +
 +    private static class TokenRangeComparator implements Comparator<Range<Token>>
 +    {
 +        @Override
 +        public int compare(Range<Token> rt1, Range<Token> rt2)
 +        {
 +            if (rt1.left.compareTo(rt2.left) == 0)
 +                return 0;
 +
 +            return rt1.compareTo(rt2);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 198b01b,471f8cf..0ce81d3
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@@ -115,10 -125,9 +115,10 @@@ public class CompactionsTes
          return store;
      }
  
-     private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+     public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
      {
          long timestamp = System.currentTimeMillis();
 +        CFMetaData cfm = Keyspace.open(ks).getColumnFamilyStore(cf).metadata;
          for (int i = startRowKey; i <= endRowKey; i++)
          {
              DecoratedKey key = Util.dk(Integer.toString(i));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 14f5707,61ab3da..9c32cef
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@@ -15,13 -15,22 +15,23 @@@
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
- 
  package org.apache.cassandra.repair;
  
 -import java.io.IOException;
  import java.net.InetAddress;
 -import java.security.MessageDigest;
 +import java.util.Arrays;
+ import java.util.Collections;
++import java.util.Iterator;
++import java.util.Map;
  import java.util.UUID;
++import java.util.concurrent.CompletableFuture;
+ import java.util.concurrent.TimeUnit;
+ 
+ import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.SettableFuture;
  
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.compaction.CompactionsTest;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.io.util.SequentialWriter;
  import org.junit.After;
  import org.junit.BeforeClass;
  import org.junit.Test;
@@@ -39,24 -51,26 +49,29 @@@ import org.apache.cassandra.net.IMessag
  import org.apache.cassandra.net.MessageIn;
  import org.apache.cassandra.net.MessageOut;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.net.IMessageSink;
  import org.apache.cassandra.repair.messages.RepairMessage;
  import org.apache.cassandra.repair.messages.ValidationComplete;
 +import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.service.ActiveRepairService;
 -import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.MerkleTree;
 +import org.apache.cassandra.utils.MerkleTrees;
- import org.apache.cassandra.utils.concurrent.SimpleCondition;
++import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.UUIDGen;
 -import org.apache.cassandra.utils.concurrent.SimpleCondition;
  
 -import static org.junit.Assert.*;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertNull;
 +import static org.junit.Assert.assertTrue;
  
  public class ValidatorTest
  {
+     private static final long TEST_TIMEOUT = 60; //seconds
+ 
      private static final String keyspace = "ValidatorTest";
      private static final String columnFamily = "Standard1";
 -    private final IPartitioner partitioner = StorageService.getPartitioner();
 +    private static IPartitioner partitioner;
  
      @BeforeClass
      public static void defineSchema() throws Exception
@@@ -78,36 -92,9 +93,9 @@@
      public void testValidatorComplete() throws Throwable
      {
          Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
 -        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 +        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
  
-         final SimpleCondition lock = new SimpleCondition();
-         MessagingService.instance().addMessageSink(new IMessageSink()
-         {
-             public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
-             {
-                 try
-                 {
-                     if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                     {
-                         RepairMessage m = (RepairMessage) message.payload;
-                         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                         assertEquals(desc, m.desc);
-                         assertTrue(((ValidationComplete) m).success());
-                         assertNotNull(((ValidationComplete) m).trees);
-                     }
-                 }
-                 finally
-                 {
-                     lock.signalAll();
-                 }
-                 return false;
-             }
- 
-             public boolean allowIncomingMessage(MessageIn message, int id)
-             {
-                 return false;
-             }
-         });
 -        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++        final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
  
          InetAddress remote = InetAddress.getByName("127.0.0.2");
  
@@@ -130,37 -116,128 +118,111 @@@
          Token min = tree.partitioner().getMinimumToken();
          assertNotNull(tree.hash(new Range<>(min, min)));
  
-         if (!lock.isSignaled())
-             lock.await();
+         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+         assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+         RepairMessage m = (RepairMessage) message.payload;
+         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+         assertEquals(desc, m.desc);
 -        assertTrue(((ValidationComplete) m).success);
 -        assertNotNull(((ValidationComplete) m).tree);
++        assertTrue(((ValidationComplete) m).success());
++        assertNotNull(((ValidationComplete) m).trees);
      }
  
 -    private static class CompactedRowStub extends AbstractCompactedRow
 -    {
 -        private CompactedRowStub(DecoratedKey key)
 -        {
 -            super(key);
 -        }
 -
 -        public RowIndexEntry write(long currentPosition, SequentialWriter out) throws IOException
 -        {
 -            throw new UnsupportedOperationException();
 -        }
 -
 -        public void update(MessageDigest digest) { }
 -
 -        public ColumnStats columnStats()
 -        {
 -            throw new UnsupportedOperationException();
 -        }
 -
 -        public void close() throws IOException { }
 -    }
  
      @Test
      public void testValidatorFailed() throws Throwable
      {
          Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
 -        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 +        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
  
-         final SimpleCondition lock = new SimpleCondition();
 -        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++        final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+ 
+         InetAddress remote = InetAddress.getByName("127.0.0.2");
+ 
+         Validator validator = new Validator(desc, remote, 0);
+         validator.fail();
+ 
+         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+         assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+         RepairMessage m = (RepairMessage) message.payload;
+         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+         assertEquals(desc, m.desc);
 -        assertFalse(((ValidationComplete) m).success);
 -        assertNull(((ValidationComplete) m).tree);
++        assertFalse(((ValidationComplete) m).success());
++        assertNull(((ValidationComplete) m).trees);
+     }
+ 
+     @Test
+     public void simpleValidationTest128() throws Exception
+     {
+         simpleValidationTest(128);
+     }
+ 
+     @Test
+     public void simpleValidationTest1500() throws Exception
+     {
+         simpleValidationTest(1500);
+     }
+ 
+     /**
+      * Test for CASSANDRA-5263
+      * 1. Create N rows
+      * 2. Run validation compaction
+      * 3. Expect merkle tree with size 2^(log2(n))
+      */
+     public void simpleValidationTest(int n) throws Exception
+     {
+         Keyspace ks = Keyspace.open(keyspace);
+         ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+         cfs.clearUnsafe();
+ 
+         // disable compaction while flushing
+         cfs.disableAutoCompaction();
+ 
+         CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+ 
+         cfs.forceBlockingFlush();
 -        assertEquals(1, cfs.getSSTables().size());
++        assertEquals(1, cfs.getLiveSSTables().size());
+ 
+         // wait enough to force single compaction
+         TimeUnit.SECONDS.sleep(5);
+ 
 -        SSTableReader sstable = cfs.getSSTables().iterator().next();
++        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+         UUID repairSessionId = UUIDGen.getTimeUUID();
+         final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
 -                                               cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
 -                                                                                             sstable.last.getToken()));
++                                               cfs.getColumnFamilyName(), Collections.singletonList(new Range<>(sstable.first.getToken(),
++                                                                                                                sstable.last.getToken())));
+ 
+         ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
 -                                                                 Collections.singletonList(cfs), Collections.singleton(desc.range),
 -                                                                 false, false);
++                                                                 Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
++                                                                 false);
+ 
 -        final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++        final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+         Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+         CompactionManager.instance.submitValidation(cfs, validator);
+ 
+         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+         assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+         RepairMessage m = (RepairMessage) message.payload;
+         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+         assertEquals(desc, m.desc);
 -        assertTrue(((ValidationComplete) m).success);
 -        MerkleTree tree = ((ValidationComplete) m).tree;
++        assertTrue(((ValidationComplete) m).success());
++        MerkleTrees trees = ((ValidationComplete) m).trees;
+ 
 -        assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
 -        assertEquals(tree.rowCount(), n);
++        Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = trees.iterator();
++        while (iterator.hasNext())
++        {
++            assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), iterator.next().getValue().size(), 0.0);
++        }
++        assertEquals(trees.rowCount(), n);
+     }
+ 
 -    private ListenableFuture<MessageOut> registerOutgoingMessageSink()
++    private CompletableFuture<MessageOut> registerOutgoingMessageSink()
+     {
 -        final SettableFuture<MessageOut> future = SettableFuture.create();
++        final CompletableFuture<MessageOut> future = new CompletableFuture<>();
          MessagingService.instance().addMessageSink(new IMessageSink()
          {
              public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
              {
-                 try
-                 {
-                     if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                     {
-                         RepairMessage m = (RepairMessage) message.payload;
-                         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
-                         assertEquals(desc, m.desc);
-                         assertFalse(((ValidationComplete) m).success());
-                         assertNull(((ValidationComplete) m).trees);
-                     }
-                 }
-                 finally
-                 {
-                     lock.signalAll();
-                 }
 -                future.set(message);
++                future.complete(message);
                  return false;
              }
  


[10/10] cassandra git commit: Merge branch 'cassandra-3.X' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-3.X' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/87825f82
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/87825f82
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/87825f82

Branch: refs/heads/trunk
Commit: 87825f8208fc21000a841f724c7f89f3702d8e70
Parents: c18968b e3b34dc
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Sep 29 15:05:25 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 15:05:25 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |   6 +-
 .../org/apache/cassandra/repair/Validator.java  |   9 +-
 .../org/apache/cassandra/utils/MerkleTree.java  |  10 ++
 .../org/apache/cassandra/utils/MerkleTrees.java |  10 ++
 .../db/compaction/CompactionsTest.java          |   2 +-
 .../apache/cassandra/repair/ValidatorTest.java  | 167 ++++++++++++-------
 7 files changed, 145 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/87825f82/CHANGES.txt
----------------------------------------------------------------------