You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/09/29 20:06:38 UTC
[01/10] cassandra git commit: Fix merkle tree depth calculation
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 238393520 -> c70ce6307
refs/heads/cassandra-3.0 5cebd1fb0 -> 413e48e65
refs/heads/cassandra-3.X 57e9a83b2 -> e3b34dc85
refs/heads/trunk c18968b1b -> 87825f820
Fix merkle tree depth calculation
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12580
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c70ce630
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c70ce630
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c70ce630
Branch: refs/heads/cassandra-2.2
Commit: c70ce6307da824529762ff40673642b6f86972aa
Parents: 2383935
Author: Paulo Motta <pa...@gmail.com>
Authored: Tue Aug 30 21:06:39 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:26:53 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../db/compaction/CompactionManager.java | 4 +-
.../org/apache/cassandra/repair/Validator.java | 9 +-
.../org/apache/cassandra/utils/MerkleTree.java | 10 ++
.../db/compaction/CompactionsTest.java | 2 +-
.../apache/cassandra/repair/ValidatorTest.java | 159 ++++++++++++-------
6 files changed, 125 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 998849e..97bc70a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Fix merkle tree depth calculation (CASSANDRA-12580)
* Make Collections deserialization more robust (CASSANDRA-12618)
@@ -36,7 +37,6 @@
* Don't write shadowed range tombstone (CASSANDRA-12030)
Merged from 2.1:
* Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
- * Fix queries with empty ByteBuffer values in clustering column restrictions (CASSANDRA-12127)
* Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828)
* Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
* cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index cf82498..78fa23c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1112,8 +1112,8 @@ public class CompactionManager implements CompactionManagerMBean
{
numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
}
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
- int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+ // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
+ int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
long start = System.nanoTime();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 4db1cfb..8dbb4cf 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -54,6 +54,7 @@ public class Validator implements Runnable
public final RepairJobDesc desc;
public final InetAddress initiator;
public final int gcBefore;
+ private final boolean evenTreeDistribution;
// null when all rows with the min token have been consumed
private long validated;
@@ -67,19 +68,25 @@ public class Validator implements Runnable
public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
{
+ this(desc, initiator, gcBefore, false);
+ }
+
+ public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution)
+ {
this.desc = desc;
this.initiator = initiator;
this.gcBefore = gcBefore;
validated = 0;
range = null;
ranges = null;
+ this.evenTreeDistribution = evenTreeDistribution;
}
public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
{
this.tree = tree;
- if (!tree.partitioner().preservesOrder())
+ if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
{
// You can't beat an even tree distribution for md5
tree.init();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 4fec62d..1e0f505 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -516,6 +516,16 @@ public class MerkleTree implements Serializable
return histbuild.buildWithStdevRangesAroundMean();
}
+ public long rowCount()
+ {
+ long count = 0;
+ for (TreeRange range : new TreeRangeIterator(this))
+ {
+ count += range.hashable.rowsInRange;
+ }
+ return count;
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 8ff3022..471f8cf 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -125,7 +125,7 @@ public class CompactionsTest
return store;
}
- private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+ public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
{
long timestamp = System.currentTimeMillis();
for (int i = startRowKey; i <= endRowKey; i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index a9f18f5..61ab3da 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -20,8 +20,16 @@ package org.apache.cassandra.repair;
import java.io.IOException;
import java.net.InetAddress;
import java.security.MessageDigest;
+import java.util.Collections;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.CompactionsTest;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.SequentialWriter;
import org.junit.After;
import org.junit.BeforeClass;
@@ -46,15 +54,20 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.IMessageSink;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import static org.junit.Assert.*;
public class ValidatorTest
{
+ private static final long TEST_TIMEOUT = 60; //seconds
+
private static final String keyspace = "ValidatorTest";
private static final String columnFamily = "Standard1";
private final IPartitioner partitioner = StorageService.getPartitioner();
@@ -81,35 +94,7 @@ public class ValidatorTest
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
- final SimpleCondition lock = new SimpleCondition();
- MessagingService.instance().addMessageSink(new IMessageSink()
- {
- @SuppressWarnings("unchecked")
- public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
- {
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success);
- assertNotNull(((ValidationComplete) m).tree);
- }
- }
- finally
- {
- lock.signalAll();
- }
- return false;
- }
-
- public boolean allowIncomingMessage(MessageIn message, int id)
- {
- return false;
- }
- });
+ final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
InetAddress remote = InetAddress.getByName("127.0.0.2");
@@ -131,8 +116,13 @@ public class ValidatorTest
Token min = tree.partitioner().getMinimumToken();
assertNotNull(tree.hash(new Range<>(min, min)));
- if (!lock.isSignaled())
- lock.await();
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertTrue(((ValidationComplete) m).success);
+ assertNotNull(((ValidationComplete) m).tree);
}
private static class CompactedRowStub extends AbstractCompactedRow
@@ -163,27 +153,91 @@ public class ValidatorTest
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
- final SimpleCondition lock = new SimpleCondition();
+ final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+
+ InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+ Validator validator = new Validator(desc, remote, 0);
+ validator.fail();
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertFalse(((ValidationComplete) m).success);
+ assertNull(((ValidationComplete) m).tree);
+ }
+
+ @Test
+ public void simpleValidationTest128() throws Exception
+ {
+ simpleValidationTest(128);
+ }
+
+ @Test
+ public void simpleValidationTest1500() throws Exception
+ {
+ simpleValidationTest(1500);
+ }
+
+ /**
+ * Test for CASSANDRA-5263
+ * 1. Create N rows
+ * 2. Run validation compaction
+ * 3. Expect merkle tree with size 2^(log2(n))
+ */
+ public void simpleValidationTest(int n) throws Exception
+ {
+ Keyspace ks = Keyspace.open(keyspace);
+ ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+ cfs.clearUnsafe();
+
+ // disable compaction while flushing
+ cfs.disableAutoCompaction();
+
+ CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+
+ cfs.forceBlockingFlush();
+ assertEquals(1, cfs.getSSTables().size());
+
+ // wait enough to force single compaction
+ TimeUnit.SECONDS.sleep(5);
+
+ SSTableReader sstable = cfs.getSSTables().iterator().next();
+ UUID repairSessionId = UUIDGen.getTimeUUID();
+ final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
+ cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
+ sstable.last.getToken()));
+
+ ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
+ Collections.singletonList(cfs), Collections.singleton(desc.range),
+ false, false);
+
+ final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+ Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+ CompactionManager.instance.submitValidation(cfs, validator);
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertTrue(((ValidationComplete) m).success);
+ MerkleTree tree = ((ValidationComplete) m).tree;
+
+ assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
+ assertEquals(tree.rowCount(), n);
+ }
+
+ private ListenableFuture<MessageOut> registerOutgoingMessageSink()
+ {
+ final SettableFuture<MessageOut> future = SettableFuture.create();
MessagingService.instance().addMessageSink(new IMessageSink()
{
- @SuppressWarnings("unchecked")
public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
{
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertFalse(((ValidationComplete) m).success);
- assertNull(((ValidationComplete) m).tree);
- }
- }
- finally
- {
- lock.signalAll();
- }
+ future.set(message);
return false;
}
@@ -192,13 +246,6 @@ public class ValidatorTest
return false;
}
});
-
- InetAddress remote = InetAddress.getByName("127.0.0.2");
-
- Validator validator = new Validator(desc, remote, 0);
- validator.fail();
-
- if (!lock.isSignaled())
- lock.await();
+ return future;
}
}
[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/413e48e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/413e48e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/413e48e6
Branch: refs/heads/cassandra-3.0
Commit: 413e48e6571e3c23362d5053e0c7fcdd99bd1e7d
Parents: 5cebd1f c70ce63
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Sep 29 14:32:49 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:32:49 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../db/compaction/CompactionManager.java | 6 +-
.../org/apache/cassandra/repair/Validator.java | 9 +-
.../org/apache/cassandra/utils/MerkleTree.java | 10 ++
.../org/apache/cassandra/utils/MerkleTrees.java | 10 ++
.../db/compaction/CompactionsTest.java | 2 +-
.../apache/cassandra/repair/ValidatorTest.java | 167 ++++++++++++-------
7 files changed, 146 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f2f8dac,97bc70a..9076e7a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,77 -1,13 +1,78 @@@
-2.2.9
+3.0.10
+ * Fix failure in LogTransactionTest (CASSANDRA-12632)
+ * Fix potentially incomplete non-frozen UDT values when querying with the
+ full primary key specified (CASSANDRA-12605)
+ * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
+ * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
+ * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
+ * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
+ * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
+ * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
+Merged from 2.2:
+ * Fix merkle tree depth calculation (CASSANDRA-12580)
+ * Make Collections deserialization more robust (CASSANDRA-12618)
-
-
-2.2.8
* Fix exceptions when enabling gossip on nodes that haven't joined the ring (CASSANDRA-12253)
* Fix authentication problem when invoking clqsh copy from a SOURCE command (CASSANDRA-12642)
* Decrement pending range calculator jobs counter in finally block
(CASSANDRA-12554)
+Merged from 2.1:
- * Make Collections deserialization more robust (CASSANDRA-12618)
+ * Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
+
+
+3.0.9
+ * Handle composite prefixes with final EOC=0 as in 2.x and refactor LegacyLayout.decodeBound (CASSANDRA-12423)
+ * Fix paging for 2.x to 3.x upgrades (CASSANDRA-11195)
+ * select_distinct_with_deletions_test failing on non-vnode environments (CASSANDRA-11126)
+ * Stack Overflow returned to queries while upgrading (CASSANDRA-12527)
+ * Fix legacy regex for temporary files from 2.2 (CASSANDRA-12565)
+ * Add option to state current gc_grace_seconds to tools/bin/sstablemetadata (CASSANDRA-12208)
+ * Fix file system race condition that may cause LogAwareFileLister to fail to classify files (CASSANDRA-11889)
+ * Fix file handle leaks due to simultaneous compaction/repair and
+ listing snapshots, calculating snapshot sizes, or making schema
+ changes (CASSANDRA-11594)
+ * Fix nodetool repair exits with 0 for some errors (CASSANDRA-12508)
+ * Do not shut down BatchlogManager twice during drain (CASSANDRA-12504)
+ * Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
+ * Calculate last compacted key on startup (CASSANDRA-6216)
+ * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
+ * Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436)
+ * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
+ * Backport CASSANDRA-12002 (CASSANDRA-12177)
+ * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
+ * Fix potential bad messaging service message for paged range reads
+ within mixed-version 3.x clusters (CASSANDRA-12249)
+ * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
+ * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
+ * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
+ * Fix upgrade of super columns on thrift (CASSANDRA-12335)
+ * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
+ * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
+ * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
+ * Lost counter writes in compact table and static columns (CASSANDRA-12219)
+ * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
+ * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
+ * Add option to override compaction space check (CASSANDRA-12180)
+ * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
+ * Respond with v1/v2 protocol header when responding to driver that attempts
+ to connect with too low of a protocol version (CASSANDRA-11464)
+ * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
+ * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
+ * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
+ * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
+ * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
+ * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
+ * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
+ * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
+ * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
+ * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
+ * Fix column ordering of results with static columns for Thrift requests in
+ a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
+ those static columns in query results (CASSANDRA-12123)
+ * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
+ * Fix EOF exception when altering column type (CASSANDRA-11820)
+ * Fix JsonTransformer output of partition with deletion info (CASSANDRA-12418)
+ * Fix NPE in SSTableLoader when specifying partial directory path (CASSANDRA-12609)
+Merged from 2.2:
* Add local address entry in PropertyFileSnitch (CASSANDRA-11332)
* cqlshlib tests: increase default execute timeout (CASSANDRA-12481)
* Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 99e0fd5,78fa23c..4d1757e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1094,34 -1102,40 +1094,33 @@@ public class CompactionManager implemen
if (validator.gcBefore > 0)
gcBefore = validator.gcBefore;
else
- gcBefore = getDefaultGcBefore(cfs);
+ gcBefore = getDefaultGcBefore(cfs, nowInSec);
}
- // Create Merkle tree suitable to hold estimated partitions for given range.
- // We blindly assume that partition is evenly distributed on all sstables for now.
- long numPartitions = 0;
- for (SSTableReader sstable : sstables)
- {
- numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
- }
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
- int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
- MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
-
+ // Create Merkle trees suitable to hold estimated partitions for the given ranges.
+ // We blindly assume that a partition is evenly distributed on all sstables for now.
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
+ MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs);
long start = System.nanoTime();
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
+ ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore);
+ CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics))
{
- CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
- Iterator<AbstractCompactedRow> iter = ci.iterator();
- metrics.beginCompaction(ci);
- try
+ // validate the CF as we iterate over it
+ validator.prepare(cfs, tree);
+ while (ci.hasNext())
{
- // validate the CF as we iterate over it
- validator.prepare(cfs, tree);
- while (iter.hasNext())
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+ try (UnfilteredRowIterator partition = ci.next())
{
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
- AbstractCompactedRow row = iter.next();
- validator.add(row);
+ validator.add(partition);
}
- validator.complete();
}
- finally
+ validator.complete();
+ }
+ finally
+ {
+ if (isSnapshotValidation && !isGlobalSnapshotValidation)
{
// we can only clear the snapshot if we are not doing a global snapshot validation (we then clear it once anticompaction
// is done).
@@@ -1144,37 -1167,6 +1143,40 @@@
}
}
+ private static MerkleTrees createMerkleTrees(Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, ColumnFamilyStore cfs)
+ {
+ MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
+ long allPartitions = 0;
+ Map<Range<Token>, Long> rangePartitionCounts = new HashMap<>();
+ for (Range<Token> range : ranges)
+ {
+ long numPartitions = 0;
+ for (SSTableReader sstable : sstables)
+ numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(range));
+ rangePartitionCounts.put(range, numPartitions);
+ allPartitions += numPartitions;
+ }
+
+ for (Range<Token> range : ranges)
+ {
+ long numPartitions = rangePartitionCounts.get(range);
+ double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0;
++ // determine max tree depth proportional to range size to avoid blowing up memory with multiple tress,
++ // capping at 20 to prevent large tree (CASSANDRA-11390)
+ int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0;
- int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), maxDepth) : 0;
++ // determine tree depth from number of partitions, capping at max tree depth (CASSANDRA-5263)
++ int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0;
+ tree.addMerkleTree((int) Math.pow(2, depth), range);
+ }
+ if (logger.isDebugEnabled())
+ {
+ // MT serialize may take time
+ logger.debug("Created {} merkle trees with merkle trees size {}, {} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, MerkleTrees.serializer.serializedSize(tree, 0));
+ }
+
+ return tree;
+ }
+
private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
{
Refs<SSTableReader> sstables;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/Validator.java
index 217c9de,8dbb4cf..9baa358
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@@ -77,13 -79,14 +83,14 @@@ public class Validator implements Runna
validated = 0;
range = null;
ranges = null;
+ this.evenTreeDistribution = evenTreeDistribution;
}
- public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
+ public void prepare(ColumnFamilyStore cfs, MerkleTrees tree)
{
- this.tree = tree;
+ this.trees = tree;
- if (!tree.partitioner().preservesOrder())
+ if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
{
// You can't beat an even tree distribution for md5
tree.init();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTrees.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/MerkleTrees.java
index b950b3b,0000000..4ae55ab
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/utils/MerkleTrees.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTrees.java
@@@ -1,436 -1,0 +1,446 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.PeekingIterator;
+import org.slf4j.Logger;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+
+/**
+ * Wrapper class for handling of multiple MerkleTrees at once.
+ *
+ * The MerkleTree's are divided in Ranges of non-overlapping tokens.
+ */
+public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>>
+{
+ public static final MerkleTreesSerializer serializer = new MerkleTreesSerializer();
+
+ private Map<Range<Token>, MerkleTree> merkleTrees = new TreeMap<>(new TokenRangeComparator());
+
+ private IPartitioner partitioner;
+
+ /**
+ * Creates empty MerkleTrees object.
+ *
+ * @param partitioner The partitioner to use
+ */
+ public MerkleTrees(IPartitioner partitioner)
+ {
+ this(partitioner, new ArrayList<>());
+ }
+
+ private MerkleTrees(IPartitioner partitioner, Collection<MerkleTree> merkleTrees)
+ {
+ this.partitioner = partitioner;
+ addTrees(merkleTrees);
+ }
+
+ /**
+ * Get the ranges that these merkle trees covers.
+ *
+ * @return
+ */
+ public Collection<Range<Token>> ranges()
+ {
+ return merkleTrees.keySet();
+ }
+
+ /**
+ * Get the partitioner in use.
+ *
+ * @return
+ */
+ public IPartitioner partitioner()
+ {
+ return partitioner;
+ }
+
+ /**
+ * Add merkle tree's with the defined maxsize and ranges.
+ *
+ * @param maxsize
+ * @param ranges
+ */
+ public void addMerkleTrees(int maxsize, Collection<Range<Token>> ranges)
+ {
+ for (Range<Token> range : ranges)
+ {
+ addMerkleTree(maxsize, range);
+ }
+ }
+
+ /**
+ * Add a MerkleTree with the defined size and range.
+ *
+ * @param maxsize
+ * @param range
+ * @return The created merkle tree.
+ */
+ public MerkleTree addMerkleTree(int maxsize, Range<Token> range)
+ {
+ return addMerkleTree(maxsize, MerkleTree.RECOMMENDED_DEPTH, range);
+ }
+
+ @VisibleForTesting
+ public MerkleTree addMerkleTree(int maxsize, byte hashdepth, Range<Token> range)
+ {
+ MerkleTree tree = new MerkleTree(partitioner, range, hashdepth, maxsize);
+ addTree(tree);
+
+ return tree;
+ }
+
+ /**
+ * Get the MerkleTree.Range responsible for the given token.
+ *
+ * @param t
+ * @return
+ */
+ @VisibleForTesting
+ public MerkleTree.TreeRange get(Token t)
+ {
+ return getMerkleTree(t).get(t);
+ }
+
+ /**
+ * Init all MerkleTree's with an even tree distribution.
+ */
+ public void init()
+ {
+ for (Range<Token> range : merkleTrees.keySet())
+ {
+ init(range);
+ }
+ }
+
+ /**
+ * Init a selected MerkleTree with an even tree distribution.
+ *
+ * @param range
+ */
+ public void init(Range<Token> range)
+ {
+ merkleTrees.get(range).init();
+ }
+
+ /**
+ * Split the MerkleTree responsible for the given token.
+ *
+ * @param t
+ * @return
+ */
+ public boolean split(Token t)
+ {
+ return getMerkleTree(t).split(t);
+ }
+
+ /**
+ * Invalidate the MerkleTree responsible for the given token.
+ *
+ * @param t
+ */
+ @VisibleForTesting
+ public void invalidate(Token t)
+ {
+ getMerkleTree(t).invalidate(t);
+ }
+
+ /**
+ * Get the MerkleTree responsible for the given token range.
+ *
+ * @param range
+ * @return
+ */
+ public MerkleTree getMerkleTree(Range<Token> range)
+ {
+ return merkleTrees.get(range);
+ }
+
+ public long size()
+ {
+ long size = 0;
+
+ for (MerkleTree tree : merkleTrees.values())
+ {
+ size += tree.size();
+ }
+
+ return size;
+ }
+
+ @VisibleForTesting
+ public void maxsize(Range<Token> range, int maxsize)
+ {
+ getMerkleTree(range).maxsize(maxsize);
+ }
+
+ /**
+ * Get the MerkleTree responsible for the given token.
+ *
+ * @param t
+ * @return The given MerkleTree or null if none exist.
+ */
+ private MerkleTree getMerkleTree(Token t)
+ {
+ for (Range<Token> range : merkleTrees.keySet())
+ {
+ if (range.contains(t))
+ return merkleTrees.get(range);
+ }
+
+ throw new AssertionError("Expected tree for token " + t);
+ }
+
+ private void addTrees(Collection<MerkleTree> trees)
+ {
+ for (MerkleTree tree : trees)
+ {
+ addTree(tree);
+ }
+ }
+
+ private void addTree(MerkleTree tree)
+ {
+ assert validateNonOverlapping(tree) : "Range [" + tree.fullRange + "] is intersecting an existing range";
+
+ merkleTrees.put(tree.fullRange, tree);
+ }
+
+ private boolean validateNonOverlapping(MerkleTree tree)
+ {
+ for (Range<Token> range : merkleTrees.keySet())
+ {
+ if (tree.fullRange.intersects(range))
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Get an iterator for all the invalids generated by the MerkleTrees.
+ *
+ * @return
+ */
+ public TreeRangeIterator invalids()
+ {
+ return new TreeRangeIterator();
+ }
+
+ /**
+ * Log the row count per leaf for all MerkleTrees.
+ *
+ * @param logger
+ */
+ public void logRowCountPerLeaf(Logger logger)
+ {
+ for (MerkleTree tree : merkleTrees.values())
+ {
+ tree.histogramOfRowCountPerLeaf().log(logger);
+ }
+ }
+
+ /**
+ * Log the row size per leaf for all MerkleTrees.
+ *
+ * @param logger
+ */
+ public void logRowSizePerLeaf(Logger logger)
+ {
+ for (MerkleTree tree : merkleTrees.values())
+ {
+ tree.histogramOfRowSizePerLeaf().log(logger);
+ }
+ }
+
+ @VisibleForTesting
+ public byte[] hash(Range<Token> range)
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ boolean hashed = false;
+
+ try
+ {
+ for (Range<Token> rt : merkleTrees.keySet())
+ {
+ if (rt.intersects(range))
+ {
+ byte[] bytes = merkleTrees.get(rt).hash(range);
+ if (bytes != null)
+ {
+ baos.write(bytes);
+ hashed = true;
+ }
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to append merkle tree hash to result");
+ }
+
+ return hashed ? baos.toByteArray() : null;
+ }
+
+ /**
+ * Get an iterator of all ranges and their MerkleTrees.
+ */
+ public Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator()
+ {
+ return merkleTrees.entrySet().iterator();
+ }
+
++ public long rowCount()
++ {
++ long totalCount = 0;
++ for (MerkleTree tree : merkleTrees.values())
++ {
++ totalCount += tree.rowCount();
++ }
++ return totalCount;
++ }
++
+ public class TreeRangeIterator extends AbstractIterator<MerkleTree.TreeRange> implements
+ Iterable<MerkleTree.TreeRange>,
+ PeekingIterator<MerkleTree.TreeRange>
+ {
+ private final Iterator<MerkleTree> it;
+
+ private MerkleTree.TreeRangeIterator current = null;
+
+ private TreeRangeIterator()
+ {
+ it = merkleTrees.values().iterator();
+ }
+
+ public MerkleTree.TreeRange computeNext()
+ {
+ if (current == null || !current.hasNext())
+ return nextIterator();
+
+ return current.next();
+ }
+
+ private MerkleTree.TreeRange nextIterator()
+ {
+ if (it.hasNext())
+ {
+ current = it.next().invalids();
+
+ return current.next();
+ }
+
+ return endOfData();
+ }
+
+ public Iterator<MerkleTree.TreeRange> iterator()
+ {
+ return this;
+ }
+ }
+
+ /**
+ * Get the differences between the two sets of MerkleTrees.
+ *
+ * @param ltree
+ * @param rtree
+ * @return
+ */
+ public static List<Range<Token>> difference(MerkleTrees ltree, MerkleTrees rtree)
+ {
+ List<Range<Token>> differences = new ArrayList<>();
+ for (MerkleTree tree : ltree.merkleTrees.values())
+ {
+ differences.addAll(MerkleTree.difference(tree, rtree.getMerkleTree(tree.fullRange)));
+ }
+ return differences;
+ }
+
+ public static class MerkleTreesSerializer implements IVersionedSerializer<MerkleTrees>
+ {
+ public void serialize(MerkleTrees trees, DataOutputPlus out, int version) throws IOException
+ {
+ out.writeInt(trees.merkleTrees.size());
+ for (MerkleTree tree : trees.merkleTrees.values())
+ {
+ MerkleTree.serializer.serialize(tree, out, version);
+ }
+ }
+
+ public MerkleTrees deserialize(DataInputPlus in, int version) throws IOException
+ {
+ IPartitioner partitioner = null;
+ int nTrees = in.readInt();
+ Collection<MerkleTree> trees = new ArrayList<>(nTrees);
+ if (nTrees > 0)
+ {
+ for (int i = 0; i < nTrees; i++)
+ {
+ MerkleTree tree = MerkleTree.serializer.deserialize(in, version);
+ trees.add(tree);
+
+ if (partitioner == null)
+ partitioner = tree.partitioner();
+ else
+ assert tree.partitioner() == partitioner;
+ }
+ }
+
+ return new MerkleTrees(partitioner, trees);
+ }
+
+ public long serializedSize(MerkleTrees trees, int version)
+ {
+ assert trees != null;
+
+ long size = TypeSizes.sizeof(trees.merkleTrees.size());
+ for (MerkleTree tree : trees.merkleTrees.values())
+ {
+ size += MerkleTree.serializer.serializedSize(tree, version);
+ }
+ return size;
+ }
+
+ }
+
+ private static class TokenRangeComparator implements Comparator<Range<Token>>
+ {
+ @Override
+ public int compare(Range<Token> rt1, Range<Token> rt2)
+ {
+ if (rt1.left.compareTo(rt2.left) == 0)
+ return 0;
+
+ return rt1.compareTo(rt2);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 198b01b,471f8cf..0ce81d3
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@@ -115,10 -125,9 +115,10 @@@ public class CompactionsTes
return store;
}
- private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+ public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
{
long timestamp = System.currentTimeMillis();
+ CFMetaData cfm = Keyspace.open(ks).getColumnFamilyStore(cf).metadata;
for (int i = startRowKey; i <= endRowKey; i++)
{
DecoratedKey key = Util.dk(Integer.toString(i));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 14f5707,61ab3da..9c32cef
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@@ -15,13 -15,22 +15,23 @@@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.repair;
-import java.io.IOException;
import java.net.InetAddress;
-import java.security.MessageDigest;
+import java.util.Arrays;
+ import java.util.Collections;
++import java.util.Iterator;
++import java.util.Map;
import java.util.UUID;
++import java.util.concurrent.CompletableFuture;
+ import java.util.concurrent.TimeUnit;
+
+ import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.SettableFuture;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.compaction.CompactionsTest;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.SequentialWriter;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
@@@ -39,24 -51,26 +49,29 @@@ import org.apache.cassandra.net.IMessag
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.IMessageSink;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
- import org.apache.cassandra.utils.concurrent.SimpleCondition;
++import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.concurrent.SimpleCondition;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class ValidatorTest
{
+ private static final long TEST_TIMEOUT = 60; //seconds
+
private static final String keyspace = "ValidatorTest";
private static final String columnFamily = "Standard1";
- private final IPartitioner partitioner = StorageService.getPartitioner();
+ private static IPartitioner partitioner;
@BeforeClass
public static void defineSchema() throws Exception
@@@ -78,36 -92,9 +93,9 @@@
public void testValidatorComplete() throws Throwable
{
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
- final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
+ final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
- final SimpleCondition lock = new SimpleCondition();
- MessagingService.instance().addMessageSink(new IMessageSink()
- {
- public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
- {
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success());
- assertNotNull(((ValidationComplete) m).trees);
- }
- }
- finally
- {
- lock.signalAll();
- }
- return false;
- }
-
- public boolean allowIncomingMessage(MessageIn message, int id)
- {
- return false;
- }
- });
- final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++ final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
InetAddress remote = InetAddress.getByName("127.0.0.2");
@@@ -130,37 -116,128 +118,111 @@@
Token min = tree.partitioner().getMinimumToken();
assertNotNull(tree.hash(new Range<>(min, min)));
- if (!lock.isSignaled())
- lock.await();
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success);
- assertNotNull(((ValidationComplete) m).tree);
++ assertTrue(((ValidationComplete) m).success());
++ assertNotNull(((ValidationComplete) m).trees);
}
- private static class CompactedRowStub extends AbstractCompactedRow
- {
- private CompactedRowStub(DecoratedKey key)
- {
- super(key);
- }
-
- public RowIndexEntry write(long currentPosition, SequentialWriter out) throws IOException
- {
- throw new UnsupportedOperationException();
- }
-
- public void update(MessageDigest digest) { }
-
- public ColumnStats columnStats()
- {
- throw new UnsupportedOperationException();
- }
-
- public void close() throws IOException { }
- }
@Test
public void testValidatorFailed() throws Throwable
{
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
- final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
+ final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
- final SimpleCondition lock = new SimpleCondition();
- final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++ final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+
+ InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+ Validator validator = new Validator(desc, remote, 0);
+ validator.fail();
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
- assertFalse(((ValidationComplete) m).success);
- assertNull(((ValidationComplete) m).tree);
++ assertFalse(((ValidationComplete) m).success());
++ assertNull(((ValidationComplete) m).trees);
+ }
+
+ @Test
+ public void simpleValidationTest128() throws Exception
+ {
+ simpleValidationTest(128);
+ }
+
+ @Test
+ public void simpleValidationTest1500() throws Exception
+ {
+ simpleValidationTest(1500);
+ }
+
+ /**
+ * Test for CASSANDRA-5263
+ * 1. Create N rows
+ * 2. Run validation compaction
+ * 3. Expect merkle tree with size 2^(log2(n))
+ */
+ public void simpleValidationTest(int n) throws Exception
+ {
+ Keyspace ks = Keyspace.open(keyspace);
+ ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+ cfs.clearUnsafe();
+
+ // disable compaction while flushing
+ cfs.disableAutoCompaction();
+
+ CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+
+ cfs.forceBlockingFlush();
- assertEquals(1, cfs.getSSTables().size());
++ assertEquals(1, cfs.getLiveSSTables().size());
+
+ // wait enough to force single compaction
+ TimeUnit.SECONDS.sleep(5);
+
- SSTableReader sstable = cfs.getSSTables().iterator().next();
++ SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+ UUID repairSessionId = UUIDGen.getTimeUUID();
+ final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
- cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
- sstable.last.getToken()));
++ cfs.getColumnFamilyName(), Collections.singletonList(new Range<>(sstable.first.getToken(),
++ sstable.last.getToken())));
+
+ ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
- Collections.singletonList(cfs), Collections.singleton(desc.range),
- false, false);
++ Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
++ false);
+
- final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++ final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+ Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+ CompactionManager.instance.submitValidation(cfs, validator);
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success);
- MerkleTree tree = ((ValidationComplete) m).tree;
++ assertTrue(((ValidationComplete) m).success());
++ MerkleTrees trees = ((ValidationComplete) m).trees;
+
- assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
- assertEquals(tree.rowCount(), n);
++ Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = trees.iterator();
++ while (iterator.hasNext())
++ {
++ assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), iterator.next().getValue().size(), 0.0);
++ }
++ assertEquals(trees.rowCount(), n);
+ }
+
- private ListenableFuture<MessageOut> registerOutgoingMessageSink()
++ private CompletableFuture<MessageOut> registerOutgoingMessageSink()
+ {
- final SettableFuture<MessageOut> future = SettableFuture.create();
++ final CompletableFuture<MessageOut> future = new CompletableFuture<>();
MessagingService.instance().addMessageSink(new IMessageSink()
{
public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
{
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertFalse(((ValidationComplete) m).success());
- assertNull(((ValidationComplete) m).trees);
- }
- }
- finally
- {
- lock.signalAll();
- }
- future.set(message);
++ future.complete(message);
return false;
}
[02/10] cassandra git commit: Fix merkle tree depth calculation
Posted by yu...@apache.org.
Fix merkle tree depth calculation
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12580
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c70ce630
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c70ce630
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c70ce630
Branch: refs/heads/cassandra-3.0
Commit: c70ce6307da824529762ff40673642b6f86972aa
Parents: 2383935
Author: Paulo Motta <pa...@gmail.com>
Authored: Tue Aug 30 21:06:39 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:26:53 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../db/compaction/CompactionManager.java | 4 +-
.../org/apache/cassandra/repair/Validator.java | 9 +-
.../org/apache/cassandra/utils/MerkleTree.java | 10 ++
.../db/compaction/CompactionsTest.java | 2 +-
.../apache/cassandra/repair/ValidatorTest.java | 159 ++++++++++++-------
6 files changed, 125 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 998849e..97bc70a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Fix merkle tree depth calculation (CASSANDRA-12580)
* Make Collections deserialization more robust (CASSANDRA-12618)
@@ -36,7 +37,6 @@
* Don't write shadowed range tombstone (CASSANDRA-12030)
Merged from 2.1:
* Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
- * Fix queries with empty ByteBuffer values in clustering column restrictions (CASSANDRA-12127)
* Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828)
* Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
* cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index cf82498..78fa23c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1112,8 +1112,8 @@ public class CompactionManager implements CompactionManagerMBean
{
numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
}
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
- int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+ // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
+ int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
long start = System.nanoTime();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 4db1cfb..8dbb4cf 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -54,6 +54,7 @@ public class Validator implements Runnable
public final RepairJobDesc desc;
public final InetAddress initiator;
public final int gcBefore;
+ private final boolean evenTreeDistribution;
// null when all rows with the min token have been consumed
private long validated;
@@ -67,19 +68,25 @@ public class Validator implements Runnable
public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
{
+ this(desc, initiator, gcBefore, false);
+ }
+
+ public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution)
+ {
this.desc = desc;
this.initiator = initiator;
this.gcBefore = gcBefore;
validated = 0;
range = null;
ranges = null;
+ this.evenTreeDistribution = evenTreeDistribution;
}
public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
{
this.tree = tree;
- if (!tree.partitioner().preservesOrder())
+ if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
{
// You can't beat an even tree distribution for md5
tree.init();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 4fec62d..1e0f505 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -516,6 +516,16 @@ public class MerkleTree implements Serializable
return histbuild.buildWithStdevRangesAroundMean();
}
+ public long rowCount()
+ {
+ long count = 0;
+ for (TreeRange range : new TreeRangeIterator(this))
+ {
+ count += range.hashable.rowsInRange;
+ }
+ return count;
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 8ff3022..471f8cf 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -125,7 +125,7 @@ public class CompactionsTest
return store;
}
- private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+ public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
{
long timestamp = System.currentTimeMillis();
for (int i = startRowKey; i <= endRowKey; i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index a9f18f5..61ab3da 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -20,8 +20,16 @@ package org.apache.cassandra.repair;
import java.io.IOException;
import java.net.InetAddress;
import java.security.MessageDigest;
+import java.util.Collections;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.CompactionsTest;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.SequentialWriter;
import org.junit.After;
import org.junit.BeforeClass;
@@ -46,15 +54,20 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.IMessageSink;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import static org.junit.Assert.*;
public class ValidatorTest
{
+ private static final long TEST_TIMEOUT = 60; //seconds
+
private static final String keyspace = "ValidatorTest";
private static final String columnFamily = "Standard1";
private final IPartitioner partitioner = StorageService.getPartitioner();
@@ -81,35 +94,7 @@ public class ValidatorTest
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
- final SimpleCondition lock = new SimpleCondition();
- MessagingService.instance().addMessageSink(new IMessageSink()
- {
- @SuppressWarnings("unchecked")
- public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
- {
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success);
- assertNotNull(((ValidationComplete) m).tree);
- }
- }
- finally
- {
- lock.signalAll();
- }
- return false;
- }
-
- public boolean allowIncomingMessage(MessageIn message, int id)
- {
- return false;
- }
- });
+ final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
InetAddress remote = InetAddress.getByName("127.0.0.2");
@@ -131,8 +116,13 @@ public class ValidatorTest
Token min = tree.partitioner().getMinimumToken();
assertNotNull(tree.hash(new Range<>(min, min)));
- if (!lock.isSignaled())
- lock.await();
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertTrue(((ValidationComplete) m).success);
+ assertNotNull(((ValidationComplete) m).tree);
}
private static class CompactedRowStub extends AbstractCompactedRow
@@ -163,27 +153,91 @@ public class ValidatorTest
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
- final SimpleCondition lock = new SimpleCondition();
+ final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+
+ InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+ Validator validator = new Validator(desc, remote, 0);
+ validator.fail();
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertFalse(((ValidationComplete) m).success);
+ assertNull(((ValidationComplete) m).tree);
+ }
+
+ @Test
+ public void simpleValidationTest128() throws Exception
+ {
+ simpleValidationTest(128);
+ }
+
+ @Test
+ public void simpleValidationTest1500() throws Exception
+ {
+ simpleValidationTest(1500);
+ }
+
+ /**
+ * Test for CASSANDRA-5263
+ * 1. Create N rows
+ * 2. Run validation compaction
+ * 3. Expect merkle tree with size 2^(log2(n))
+ */
+ public void simpleValidationTest(int n) throws Exception
+ {
+ Keyspace ks = Keyspace.open(keyspace);
+ ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+ cfs.clearUnsafe();
+
+ // disable compaction while flushing
+ cfs.disableAutoCompaction();
+
+ CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+
+ cfs.forceBlockingFlush();
+ assertEquals(1, cfs.getSSTables().size());
+
+ // wait enough to force single compaction
+ TimeUnit.SECONDS.sleep(5);
+
+ SSTableReader sstable = cfs.getSSTables().iterator().next();
+ UUID repairSessionId = UUIDGen.getTimeUUID();
+ final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
+ cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
+ sstable.last.getToken()));
+
+ ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
+ Collections.singletonList(cfs), Collections.singleton(desc.range),
+ false, false);
+
+ final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+ Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+ CompactionManager.instance.submitValidation(cfs, validator);
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertTrue(((ValidationComplete) m).success);
+ MerkleTree tree = ((ValidationComplete) m).tree;
+
+ assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
+ assertEquals(tree.rowCount(), n);
+ }
+
+ private ListenableFuture<MessageOut> registerOutgoingMessageSink()
+ {
+ final SettableFuture<MessageOut> future = SettableFuture.create();
MessagingService.instance().addMessageSink(new IMessageSink()
{
- @SuppressWarnings("unchecked")
public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
{
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertFalse(((ValidationComplete) m).success);
- assertNull(((ValidationComplete) m).tree);
- }
- }
- finally
- {
- lock.signalAll();
- }
+ future.set(message);
return false;
}
@@ -192,13 +246,6 @@ public class ValidatorTest
return false;
}
});
-
- InetAddress remote = InetAddress.getByName("127.0.0.2");
-
- Validator validator = new Validator(desc, remote, 0);
- validator.fail();
-
- if (!lock.isSignaled())
- lock.await();
+ return future;
}
}
[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/413e48e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/413e48e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/413e48e6
Branch: refs/heads/trunk
Commit: 413e48e6571e3c23362d5053e0c7fcdd99bd1e7d
Parents: 5cebd1f c70ce63
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Sep 29 14:32:49 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:32:49 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../db/compaction/CompactionManager.java | 6 +-
.../org/apache/cassandra/repair/Validator.java | 9 +-
.../org/apache/cassandra/utils/MerkleTree.java | 10 ++
.../org/apache/cassandra/utils/MerkleTrees.java | 10 ++
.../db/compaction/CompactionsTest.java | 2 +-
.../apache/cassandra/repair/ValidatorTest.java | 167 ++++++++++++-------
7 files changed, 146 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f2f8dac,97bc70a..9076e7a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,77 -1,13 +1,78 @@@
-2.2.9
+3.0.10
+ * Fix failure in LogTransactionTest (CASSANDRA-12632)
+ * Fix potentially incomplete non-frozen UDT values when querying with the
+ full primary key specified (CASSANDRA-12605)
+ * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
+ * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
+ * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
+ * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
+ * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
+ * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
+Merged from 2.2:
+ * Fix merkle tree depth calculation (CASSANDRA-12580)
+ * Make Collections deserialization more robust (CASSANDRA-12618)
-
-
-2.2.8
* Fix exceptions when enabling gossip on nodes that haven't joined the ring (CASSANDRA-12253)
* Fix authentication problem when invoking clqsh copy from a SOURCE command (CASSANDRA-12642)
* Decrement pending range calculator jobs counter in finally block
(CASSANDRA-12554)
+Merged from 2.1:
- * Make Collections deserialization more robust (CASSANDRA-12618)
+ * Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
+
+
+3.0.9
+ * Handle composite prefixes with final EOC=0 as in 2.x and refactor LegacyLayout.decodeBound (CASSANDRA-12423)
+ * Fix paging for 2.x to 3.x upgrades (CASSANDRA-11195)
+ * select_distinct_with_deletions_test failing on non-vnode environments (CASSANDRA-11126)
+ * Stack Overflow returned to queries while upgrading (CASSANDRA-12527)
+ * Fix legacy regex for temporary files from 2.2 (CASSANDRA-12565)
+ * Add option to state current gc_grace_seconds to tools/bin/sstablemetadata (CASSANDRA-12208)
+ * Fix file system race condition that may cause LogAwareFileLister to fail to classify files (CASSANDRA-11889)
+ * Fix file handle leaks due to simultaneous compaction/repair and
+ listing snapshots, calculating snapshot sizes, or making schema
+ changes (CASSANDRA-11594)
+ * Fix nodetool repair exits with 0 for some errors (CASSANDRA-12508)
+ * Do not shut down BatchlogManager twice during drain (CASSANDRA-12504)
+ * Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
+ * Calculate last compacted key on startup (CASSANDRA-6216)
+ * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
+ * Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436)
+ * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
+ * Backport CASSANDRA-12002 (CASSANDRA-12177)
+ * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
+ * Fix potential bad messaging service message for paged range reads
+ within mixed-version 3.x clusters (CASSANDRA-12249)
+ * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
+ * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
+ * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
+ * Fix upgrade of super columns on thrift (CASSANDRA-12335)
+ * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
+ * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
+ * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
+ * Lost counter writes in compact table and static columns (CASSANDRA-12219)
+ * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
+ * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
+ * Add option to override compaction space check (CASSANDRA-12180)
+ * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
+ * Respond with v1/v2 protocol header when responding to driver that attempts
+ to connect with too low of a protocol version (CASSANDRA-11464)
+ * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
+ * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
+ * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
+ * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
+ * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
+ * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
+ * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
+ * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
+ * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
+ * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
+ * Fix column ordering of results with static columns for Thrift requests in
+ a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
+ those static columns in query results (CASSANDRA-12123)
+ * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
+ * Fix EOF exception when altering column type (CASSANDRA-11820)
+ * Fix JsonTransformer output of partition with deletion info (CASSANDRA-12418)
+ * Fix NPE in SSTableLoader when specifying partial directory path (CASSANDRA-12609)
+Merged from 2.2:
* Add local address entry in PropertyFileSnitch (CASSANDRA-11332)
* cqlshlib tests: increase default execute timeout (CASSANDRA-12481)
* Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 99e0fd5,78fa23c..4d1757e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1094,34 -1102,40 +1094,33 @@@ public class CompactionManager implemen
if (validator.gcBefore > 0)
gcBefore = validator.gcBefore;
else
- gcBefore = getDefaultGcBefore(cfs);
+ gcBefore = getDefaultGcBefore(cfs, nowInSec);
}
- // Create Merkle tree suitable to hold estimated partitions for given range.
- // We blindly assume that partition is evenly distributed on all sstables for now.
- long numPartitions = 0;
- for (SSTableReader sstable : sstables)
- {
- numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
- }
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
- int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
- MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
-
+ // Create Merkle trees suitable to hold estimated partitions for the given ranges.
+ // We blindly assume that a partition is evenly distributed on all sstables for now.
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
+ MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs);
long start = System.nanoTime();
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
+ ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore);
+ CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics))
{
- CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
- Iterator<AbstractCompactedRow> iter = ci.iterator();
- metrics.beginCompaction(ci);
- try
+ // validate the CF as we iterate over it
+ validator.prepare(cfs, tree);
+ while (ci.hasNext())
{
- // validate the CF as we iterate over it
- validator.prepare(cfs, tree);
- while (iter.hasNext())
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+ try (UnfilteredRowIterator partition = ci.next())
{
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
- AbstractCompactedRow row = iter.next();
- validator.add(row);
+ validator.add(partition);
}
- validator.complete();
}
- finally
+ validator.complete();
+ }
+ finally
+ {
+ if (isSnapshotValidation && !isGlobalSnapshotValidation)
{
// we can only clear the snapshot if we are not doing a global snapshot validation (we then clear it once anticompaction
// is done).
@@@ -1144,37 -1167,6 +1143,40 @@@
}
}
+ private static MerkleTrees createMerkleTrees(Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, ColumnFamilyStore cfs)
+ {
+ MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
+ long allPartitions = 0;
+ Map<Range<Token>, Long> rangePartitionCounts = new HashMap<>();
+ for (Range<Token> range : ranges)
+ {
+ long numPartitions = 0;
+ for (SSTableReader sstable : sstables)
+ numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(range));
+ rangePartitionCounts.put(range, numPartitions);
+ allPartitions += numPartitions;
+ }
+
+ for (Range<Token> range : ranges)
+ {
+ long numPartitions = rangePartitionCounts.get(range);
+ double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0;
++ // determine max tree depth proportional to range size to avoid blowing up memory with multiple tress,
++ // capping at 20 to prevent large tree (CASSANDRA-11390)
+ int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0;
- int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), maxDepth) : 0;
++ // determine tree depth from number of partitions, capping at max tree depth (CASSANDRA-5263)
++ int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0;
+ tree.addMerkleTree((int) Math.pow(2, depth), range);
+ }
+ if (logger.isDebugEnabled())
+ {
+ // MT serialize may take time
+ logger.debug("Created {} merkle trees with merkle trees size {}, {} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, MerkleTrees.serializer.serializedSize(tree, 0));
+ }
+
+ return tree;
+ }
+
private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
{
Refs<SSTableReader> sstables;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/Validator.java
index 217c9de,8dbb4cf..9baa358
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@@ -77,13 -79,14 +83,14 @@@ public class Validator implements Runna
validated = 0;
range = null;
ranges = null;
+ this.evenTreeDistribution = evenTreeDistribution;
}
- public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
+ public void prepare(ColumnFamilyStore cfs, MerkleTrees tree)
{
- this.tree = tree;
+ this.trees = tree;
- if (!tree.partitioner().preservesOrder())
+ if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
{
// You can't beat an even tree distribution for md5
tree.init();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTrees.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/MerkleTrees.java
index b950b3b,0000000..4ae55ab
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/utils/MerkleTrees.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTrees.java
@@@ -1,436 -1,0 +1,446 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.PeekingIterator;
+import org.slf4j.Logger;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+
+/**
+ * Wrapper class for handling of multiple MerkleTrees at once.
+ *
+ * The MerkleTree's are divided in Ranges of non-overlapping tokens.
+ */
+public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>>
+{
+ public static final MerkleTreesSerializer serializer = new MerkleTreesSerializer();
+
+ private Map<Range<Token>, MerkleTree> merkleTrees = new TreeMap<>(new TokenRangeComparator());
+
+ private IPartitioner partitioner;
+
+ /**
+ * Creates empty MerkleTrees object.
+ *
+ * @param partitioner The partitioner to use
+ */
+ public MerkleTrees(IPartitioner partitioner)
+ {
+ this(partitioner, new ArrayList<>());
+ }
+
+ private MerkleTrees(IPartitioner partitioner, Collection<MerkleTree> merkleTrees)
+ {
+ this.partitioner = partitioner;
+ addTrees(merkleTrees);
+ }
+
+ /**
+ * Get the ranges that these merkle trees covers.
+ *
+ * @return
+ */
+ public Collection<Range<Token>> ranges()
+ {
+ return merkleTrees.keySet();
+ }
+
+ /**
+ * Get the partitioner in use.
+ *
+ * @return
+ */
+ public IPartitioner partitioner()
+ {
+ return partitioner;
+ }
+
+ /**
+ * Add merkle tree's with the defined maxsize and ranges.
+ *
+ * @param maxsize
+ * @param ranges
+ */
+ public void addMerkleTrees(int maxsize, Collection<Range<Token>> ranges)
+ {
+ for (Range<Token> range : ranges)
+ {
+ addMerkleTree(maxsize, range);
+ }
+ }
+
+ /**
+ * Add a MerkleTree with the defined size and range.
+ *
+ * @param maxsize
+ * @param range
+ * @return The created merkle tree.
+ */
+ public MerkleTree addMerkleTree(int maxsize, Range<Token> range)
+ {
+ return addMerkleTree(maxsize, MerkleTree.RECOMMENDED_DEPTH, range);
+ }
+
+ @VisibleForTesting
+ public MerkleTree addMerkleTree(int maxsize, byte hashdepth, Range<Token> range)
+ {
+ MerkleTree tree = new MerkleTree(partitioner, range, hashdepth, maxsize);
+ addTree(tree);
+
+ return tree;
+ }
+
+ /**
+ * Get the MerkleTree.Range responsible for the given token.
+ *
+ * @param t
+ * @return
+ */
+ @VisibleForTesting
+ public MerkleTree.TreeRange get(Token t)
+ {
+ return getMerkleTree(t).get(t);
+ }
+
+ /**
+ * Init all MerkleTree's with an even tree distribution.
+ */
+ public void init()
+ {
+ for (Range<Token> range : merkleTrees.keySet())
+ {
+ init(range);
+ }
+ }
+
+ /**
+ * Init a selected MerkleTree with an even tree distribution.
+ *
+ * @param range
+ */
+ public void init(Range<Token> range)
+ {
+ merkleTrees.get(range).init();
+ }
+
+ /**
+ * Split the MerkleTree responsible for the given token.
+ *
+ * @param t
+ * @return
+ */
+ public boolean split(Token t)
+ {
+ return getMerkleTree(t).split(t);
+ }
+
+ /**
+ * Invalidate the MerkleTree responsible for the given token.
+ *
+ * @param t
+ */
+ @VisibleForTesting
+ public void invalidate(Token t)
+ {
+ getMerkleTree(t).invalidate(t);
+ }
+
+ /**
+ * Get the MerkleTree responsible for the given token range.
+ *
+ * @param range
+ * @return
+ */
+ public MerkleTree getMerkleTree(Range<Token> range)
+ {
+ return merkleTrees.get(range);
+ }
+
+ public long size()
+ {
+ long size = 0;
+
+ for (MerkleTree tree : merkleTrees.values())
+ {
+ size += tree.size();
+ }
+
+ return size;
+ }
+
+ @VisibleForTesting
+ public void maxsize(Range<Token> range, int maxsize)
+ {
+ getMerkleTree(range).maxsize(maxsize);
+ }
+
+ /**
+ * Get the MerkleTree responsible for the given token.
+ *
+ * @param t
+ * @return The given MerkleTree or null if none exist.
+ */
+ private MerkleTree getMerkleTree(Token t)
+ {
+ for (Range<Token> range : merkleTrees.keySet())
+ {
+ if (range.contains(t))
+ return merkleTrees.get(range);
+ }
+
+ throw new AssertionError("Expected tree for token " + t);
+ }
+
+ private void addTrees(Collection<MerkleTree> trees)
+ {
+ for (MerkleTree tree : trees)
+ {
+ addTree(tree);
+ }
+ }
+
+ private void addTree(MerkleTree tree)
+ {
+ assert validateNonOverlapping(tree) : "Range [" + tree.fullRange + "] is intersecting an existing range";
+
+ merkleTrees.put(tree.fullRange, tree);
+ }
+
+ private boolean validateNonOverlapping(MerkleTree tree)
+ {
+ for (Range<Token> range : merkleTrees.keySet())
+ {
+ if (tree.fullRange.intersects(range))
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Get an iterator for all the invalids generated by the MerkleTrees.
+ *
+ * @return
+ */
+ public TreeRangeIterator invalids()
+ {
+ return new TreeRangeIterator();
+ }
+
+ /**
+ * Log the row count per leaf for all MerkleTrees.
+ *
+ * @param logger
+ */
+ public void logRowCountPerLeaf(Logger logger)
+ {
+ for (MerkleTree tree : merkleTrees.values())
+ {
+ tree.histogramOfRowCountPerLeaf().log(logger);
+ }
+ }
+
+ /**
+ * Log the row size per leaf for all MerkleTrees.
+ *
+ * @param logger
+ */
+ public void logRowSizePerLeaf(Logger logger)
+ {
+ for (MerkleTree tree : merkleTrees.values())
+ {
+ tree.histogramOfRowSizePerLeaf().log(logger);
+ }
+ }
+
+ @VisibleForTesting
+ public byte[] hash(Range<Token> range)
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ boolean hashed = false;
+
+ try
+ {
+ for (Range<Token> rt : merkleTrees.keySet())
+ {
+ if (rt.intersects(range))
+ {
+ byte[] bytes = merkleTrees.get(rt).hash(range);
+ if (bytes != null)
+ {
+ baos.write(bytes);
+ hashed = true;
+ }
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to append merkle tree hash to result");
+ }
+
+ return hashed ? baos.toByteArray() : null;
+ }
+
+ /**
+ * Get an iterator of all ranges and their MerkleTrees.
+ */
+ public Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator()
+ {
+ return merkleTrees.entrySet().iterator();
+ }
+
++ public long rowCount()
++ {
++ long totalCount = 0;
++ for (MerkleTree tree : merkleTrees.values())
++ {
++ totalCount += tree.rowCount();
++ }
++ return totalCount;
++ }
++
+ public class TreeRangeIterator extends AbstractIterator<MerkleTree.TreeRange> implements
+ Iterable<MerkleTree.TreeRange>,
+ PeekingIterator<MerkleTree.TreeRange>
+ {
+ private final Iterator<MerkleTree> it;
+
+ private MerkleTree.TreeRangeIterator current = null;
+
+ private TreeRangeIterator()
+ {
+ it = merkleTrees.values().iterator();
+ }
+
+ public MerkleTree.TreeRange computeNext()
+ {
+ if (current == null || !current.hasNext())
+ return nextIterator();
+
+ return current.next();
+ }
+
+ private MerkleTree.TreeRange nextIterator()
+ {
+ if (it.hasNext())
+ {
+ current = it.next().invalids();
+
+ return current.next();
+ }
+
+ return endOfData();
+ }
+
+ public Iterator<MerkleTree.TreeRange> iterator()
+ {
+ return this;
+ }
+ }
+
+ /**
+ * Get the differences between the two sets of MerkleTrees.
+ *
+ * @param ltree
+ * @param rtree
+ * @return
+ */
+ public static List<Range<Token>> difference(MerkleTrees ltree, MerkleTrees rtree)
+ {
+ List<Range<Token>> differences = new ArrayList<>();
+ for (MerkleTree tree : ltree.merkleTrees.values())
+ {
+ differences.addAll(MerkleTree.difference(tree, rtree.getMerkleTree(tree.fullRange)));
+ }
+ return differences;
+ }
+
+ public static class MerkleTreesSerializer implements IVersionedSerializer<MerkleTrees>
+ {
+ public void serialize(MerkleTrees trees, DataOutputPlus out, int version) throws IOException
+ {
+ out.writeInt(trees.merkleTrees.size());
+ for (MerkleTree tree : trees.merkleTrees.values())
+ {
+ MerkleTree.serializer.serialize(tree, out, version);
+ }
+ }
+
+ public MerkleTrees deserialize(DataInputPlus in, int version) throws IOException
+ {
+ IPartitioner partitioner = null;
+ int nTrees = in.readInt();
+ Collection<MerkleTree> trees = new ArrayList<>(nTrees);
+ if (nTrees > 0)
+ {
+ for (int i = 0; i < nTrees; i++)
+ {
+ MerkleTree tree = MerkleTree.serializer.deserialize(in, version);
+ trees.add(tree);
+
+ if (partitioner == null)
+ partitioner = tree.partitioner();
+ else
+ assert tree.partitioner() == partitioner;
+ }
+ }
+
+ return new MerkleTrees(partitioner, trees);
+ }
+
+ public long serializedSize(MerkleTrees trees, int version)
+ {
+ assert trees != null;
+
+ long size = TypeSizes.sizeof(trees.merkleTrees.size());
+ for (MerkleTree tree : trees.merkleTrees.values())
+ {
+ size += MerkleTree.serializer.serializedSize(tree, version);
+ }
+ return size;
+ }
+
+ }
+
+ private static class TokenRangeComparator implements Comparator<Range<Token>>
+ {
+ @Override
+ public int compare(Range<Token> rt1, Range<Token> rt2)
+ {
+ if (rt1.left.compareTo(rt2.left) == 0)
+ return 0;
+
+ return rt1.compareTo(rt2);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 198b01b,471f8cf..0ce81d3
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@@ -115,10 -125,9 +115,10 @@@ public class CompactionsTes
return store;
}
- private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+ public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
{
long timestamp = System.currentTimeMillis();
+ CFMetaData cfm = Keyspace.open(ks).getColumnFamilyStore(cf).metadata;
for (int i = startRowKey; i <= endRowKey; i++)
{
DecoratedKey key = Util.dk(Integer.toString(i));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 14f5707,61ab3da..9c32cef
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@@ -15,13 -15,22 +15,23 @@@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.repair;
-import java.io.IOException;
import java.net.InetAddress;
-import java.security.MessageDigest;
+import java.util.Arrays;
+ import java.util.Collections;
++import java.util.Iterator;
++import java.util.Map;
import java.util.UUID;
++import java.util.concurrent.CompletableFuture;
+ import java.util.concurrent.TimeUnit;
+
+ import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.SettableFuture;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.compaction.CompactionsTest;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.SequentialWriter;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
@@@ -39,24 -51,26 +49,29 @@@ import org.apache.cassandra.net.IMessag
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.IMessageSink;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
- import org.apache.cassandra.utils.concurrent.SimpleCondition;
++import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.concurrent.SimpleCondition;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class ValidatorTest
{
+ private static final long TEST_TIMEOUT = 60; //seconds
+
private static final String keyspace = "ValidatorTest";
private static final String columnFamily = "Standard1";
- private final IPartitioner partitioner = StorageService.getPartitioner();
+ private static IPartitioner partitioner;
@BeforeClass
public static void defineSchema() throws Exception
@@@ -78,36 -92,9 +93,9 @@@
public void testValidatorComplete() throws Throwable
{
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
- final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
+ final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
- final SimpleCondition lock = new SimpleCondition();
- MessagingService.instance().addMessageSink(new IMessageSink()
- {
- public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
- {
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success());
- assertNotNull(((ValidationComplete) m).trees);
- }
- }
- finally
- {
- lock.signalAll();
- }
- return false;
- }
-
- public boolean allowIncomingMessage(MessageIn message, int id)
- {
- return false;
- }
- });
- final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++ final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
InetAddress remote = InetAddress.getByName("127.0.0.2");
@@@ -130,37 -116,128 +118,111 @@@
Token min = tree.partitioner().getMinimumToken();
assertNotNull(tree.hash(new Range<>(min, min)));
- if (!lock.isSignaled())
- lock.await();
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success);
- assertNotNull(((ValidationComplete) m).tree);
++ assertTrue(((ValidationComplete) m).success());
++ assertNotNull(((ValidationComplete) m).trees);
}
- private static class CompactedRowStub extends AbstractCompactedRow
- {
- private CompactedRowStub(DecoratedKey key)
- {
- super(key);
- }
-
- public RowIndexEntry write(long currentPosition, SequentialWriter out) throws IOException
- {
- throw new UnsupportedOperationException();
- }
-
- public void update(MessageDigest digest) { }
-
- public ColumnStats columnStats()
- {
- throw new UnsupportedOperationException();
- }
-
- public void close() throws IOException { }
- }
@Test
public void testValidatorFailed() throws Throwable
{
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
- final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
+ final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
- final SimpleCondition lock = new SimpleCondition();
- final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++ final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+
+ InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+ Validator validator = new Validator(desc, remote, 0);
+ validator.fail();
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
- assertFalse(((ValidationComplete) m).success);
- assertNull(((ValidationComplete) m).tree);
++ assertFalse(((ValidationComplete) m).success());
++ assertNull(((ValidationComplete) m).trees);
+ }
+
+ @Test
+ public void simpleValidationTest128() throws Exception
+ {
+ simpleValidationTest(128);
+ }
+
+ @Test
+ public void simpleValidationTest1500() throws Exception
+ {
+ simpleValidationTest(1500);
+ }
+
+ /**
+ * Test for CASSANDRA-5263
+ * 1. Create N rows
+ * 2. Run validation compaction
+ * 3. Expect merkle tree with size 2^(log2(n))
+ */
+ public void simpleValidationTest(int n) throws Exception
+ {
+ Keyspace ks = Keyspace.open(keyspace);
+ ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+ cfs.clearUnsafe();
+
+ // disable compaction while flushing
+ cfs.disableAutoCompaction();
+
+ CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+
+ cfs.forceBlockingFlush();
- assertEquals(1, cfs.getSSTables().size());
++ assertEquals(1, cfs.getLiveSSTables().size());
+
+ // wait enough to force single compaction
+ TimeUnit.SECONDS.sleep(5);
+
- SSTableReader sstable = cfs.getSSTables().iterator().next();
++ SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+ UUID repairSessionId = UUIDGen.getTimeUUID();
+ final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
- cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
- sstable.last.getToken()));
++ cfs.getColumnFamilyName(), Collections.singletonList(new Range<>(sstable.first.getToken(),
++ sstable.last.getToken())));
+
+ ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
- Collections.singletonList(cfs), Collections.singleton(desc.range),
- false, false);
++ Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
++ false);
+
- final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++ final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+ Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+ CompactionManager.instance.submitValidation(cfs, validator);
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success);
- MerkleTree tree = ((ValidationComplete) m).tree;
++ assertTrue(((ValidationComplete) m).success());
++ MerkleTrees trees = ((ValidationComplete) m).trees;
+
- assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
- assertEquals(tree.rowCount(), n);
++ Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = trees.iterator();
++ while (iterator.hasNext())
++ {
++ assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), iterator.next().getValue().size(), 0.0);
++ }
++ assertEquals(trees.rowCount(), n);
+ }
+
- private ListenableFuture<MessageOut> registerOutgoingMessageSink()
++ private CompletableFuture<MessageOut> registerOutgoingMessageSink()
+ {
- final SettableFuture<MessageOut> future = SettableFuture.create();
++ final CompletableFuture<MessageOut> future = new CompletableFuture<>();
MessagingService.instance().addMessageSink(new IMessageSink()
{
public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
{
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertFalse(((ValidationComplete) m).success());
- assertNull(((ValidationComplete) m).trees);
- }
- }
- finally
- {
- lock.signalAll();
- }
- future.set(message);
++ future.complete(message);
return false;
}
[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.X
Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e3b34dc8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e3b34dc8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e3b34dc8
Branch: refs/heads/trunk
Commit: e3b34dc8584373c9f503e830ff3241e0865ab994
Parents: 57e9a83 413e48e
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Sep 29 15:05:12 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 15:05:12 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 6 +-
.../org/apache/cassandra/repair/Validator.java | 9 +-
.../org/apache/cassandra/utils/MerkleTree.java | 10 ++
.../org/apache/cassandra/utils/MerkleTrees.java | 10 ++
.../db/compaction/CompactionsTest.java | 2 +-
.../apache/cassandra/repair/ValidatorTest.java | 167 ++++++++++++-------
7 files changed, 145 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c33b1d3,9076e7a..ba08745
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -91,59 -35,12 +91,60 @@@ Merged from 3.0
* Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
* Calculate last compacted key on startup (CASSANDRA-6216)
* Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
+ * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
+ * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
+Merged from 2.2:
++ * Fix merkle tree depth calculation (CASSANDRA-12580)
+ * Make Collections deserialization more robust (CASSANDRA-12618)
+ * Fix exceptions when enabling gossip on nodes that haven't joined the ring (CASSANDRA-12253)
+ * Fix authentication problem when invoking clqsh copy from a SOURCE command (CASSANDRA-12642)
+ * Decrement pending range calculator jobs counter in finally block
+ * cqlshlib tests: increase default execute timeout (CASSANDRA-12481)
+ * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)
+ * Fail repair on non-existing table (CASSANDRA-12279)
+ * Enable repair -pr and -local together (fix regression of CASSANDRA-7450) (CASSANDRA-12522)
+
+
+3.8, 3.9
+ * Fix value skipping with counter columns (CASSANDRA-11726)
+ * Fix nodetool tablestats miss SSTable count (CASSANDRA-12205)
+ * Fixed flacky SSTablesIteratedTest (CASSANDRA-12282)
+ * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
+ * cqlsh: Fix handling of $$-escaped strings (CASSANDRA-12189)
+ * Fix SSL JMX requiring truststore containing server cert (CASSANDRA-12109)
+ * RTE from new CDC column breaks in flight queries (CASSANDRA-12236)
+ * Fix hdr logging for single operation workloads (CASSANDRA-12145)
+ * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073)
+ * Increase size of flushExecutor thread pool (CASSANDRA-12071)
+ * Partial revert of CASSANDRA-11971, cannot recycle buffer in SP.sendMessagesToNonlocalDC (CASSANDRA-11950)
+ * Upgrade netty to 4.0.39 (CASSANDRA-12032, CASSANDRA-12034)
+ * Improve details in compaction log message (CASSANDRA-12080)
+ * Allow unset values in CQLSSTableWriter (CASSANDRA-11911)
+ * Chunk cache to request compressor-compatible buffers if pool space is exhausted (CASSANDRA-11993)
+ * Remove DatabaseDescriptor dependencies from SequentialWriter (CASSANDRA-11579)
+ * Move skip_stop_words filter before stemming (CASSANDRA-12078)
+ * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957)
+ * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002)
+ * When SEPWorker assigned work, set thread name to match pool (CASSANDRA-11966)
+ * Add cross-DC latency metrics (CASSANDRA-11596)
+ * Allow terms in selection clause (CASSANDRA-10783)
+ * Add bind variables to trace (CASSANDRA-11719)
+ * Switch counter shards' clock to timestamps (CASSANDRA-9811)
+ * Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853)
+ * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718)
+ * Support older ant versions (CASSANDRA-11807)
+ * Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623)
+ * cassandra-stress profiles should support case sensitive schemas (CASSANDRA-11546)
+ * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578)
+ * Faster streaming (CASSANDRA-9766)
+ * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425)
+ * Add repaired percentage metric (CASSANDRA-11503)
+ * Add Change-Data-Capture (CASSANDRA-8844)
+Merged from 3.0:
+ * Fix paging for 2.x to 3.x upgrades (CASSANDRA-11195)
* Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436)
* Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
- * Backport CASSANDRA-12002 (CASSANDRA-12177)
* Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
- * Fix potential bad messaging service message for paged range reads
- within mixed-version 3.x clusters (CASSANDRA-12249)
* Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
* NullPointerException during compaction on table with static columns (CASSANDRA-12336)
* Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/utils/MerkleTrees.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
[04/10] cassandra git commit: Fix merkle tree depth calculation
Posted by yu...@apache.org.
Fix merkle tree depth calculation
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12580
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c70ce630
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c70ce630
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c70ce630
Branch: refs/heads/trunk
Commit: c70ce6307da824529762ff40673642b6f86972aa
Parents: 2383935
Author: Paulo Motta <pa...@gmail.com>
Authored: Tue Aug 30 21:06:39 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:26:53 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../db/compaction/CompactionManager.java | 4 +-
.../org/apache/cassandra/repair/Validator.java | 9 +-
.../org/apache/cassandra/utils/MerkleTree.java | 10 ++
.../db/compaction/CompactionsTest.java | 2 +-
.../apache/cassandra/repair/ValidatorTest.java | 159 ++++++++++++-------
6 files changed, 125 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 998849e..97bc70a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Fix merkle tree depth calculation (CASSANDRA-12580)
* Make Collections deserialization more robust (CASSANDRA-12618)
@@ -36,7 +37,6 @@
* Don't write shadowed range tombstone (CASSANDRA-12030)
Merged from 2.1:
* Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
- * Fix queries with empty ByteBuffer values in clustering column restrictions (CASSANDRA-12127)
* Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828)
* Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
* cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index cf82498..78fa23c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1112,8 +1112,8 @@ public class CompactionManager implements CompactionManagerMBean
{
numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
}
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
- int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+ // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
+ int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
long start = System.nanoTime();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 4db1cfb..8dbb4cf 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -54,6 +54,7 @@ public class Validator implements Runnable
public final RepairJobDesc desc;
public final InetAddress initiator;
public final int gcBefore;
+ private final boolean evenTreeDistribution;
// null when all rows with the min token have been consumed
private long validated;
@@ -67,19 +68,25 @@ public class Validator implements Runnable
public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
{
+ this(desc, initiator, gcBefore, false);
+ }
+
+ public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution)
+ {
this.desc = desc;
this.initiator = initiator;
this.gcBefore = gcBefore;
validated = 0;
range = null;
ranges = null;
+ this.evenTreeDistribution = evenTreeDistribution;
}
public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
{
this.tree = tree;
- if (!tree.partitioner().preservesOrder())
+ if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
{
// You can't beat an even tree distribution for md5
tree.init();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 4fec62d..1e0f505 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -516,6 +516,16 @@ public class MerkleTree implements Serializable
return histbuild.buildWithStdevRangesAroundMean();
}
+ public long rowCount()
+ {
+ long count = 0;
+ for (TreeRange range : new TreeRangeIterator(this))
+ {
+ count += range.hashable.rowsInRange;
+ }
+ return count;
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 8ff3022..471f8cf 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -125,7 +125,7 @@ public class CompactionsTest
return store;
}
- private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+ public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
{
long timestamp = System.currentTimeMillis();
for (int i = startRowKey; i <= endRowKey; i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index a9f18f5..61ab3da 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -20,8 +20,16 @@ package org.apache.cassandra.repair;
import java.io.IOException;
import java.net.InetAddress;
import java.security.MessageDigest;
+import java.util.Collections;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.CompactionsTest;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.SequentialWriter;
import org.junit.After;
import org.junit.BeforeClass;
@@ -46,15 +54,20 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.IMessageSink;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import static org.junit.Assert.*;
public class ValidatorTest
{
+ private static final long TEST_TIMEOUT = 60; //seconds
+
private static final String keyspace = "ValidatorTest";
private static final String columnFamily = "Standard1";
private final IPartitioner partitioner = StorageService.getPartitioner();
@@ -81,35 +94,7 @@ public class ValidatorTest
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
- final SimpleCondition lock = new SimpleCondition();
- MessagingService.instance().addMessageSink(new IMessageSink()
- {
- @SuppressWarnings("unchecked")
- public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
- {
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success);
- assertNotNull(((ValidationComplete) m).tree);
- }
- }
- finally
- {
- lock.signalAll();
- }
- return false;
- }
-
- public boolean allowIncomingMessage(MessageIn message, int id)
- {
- return false;
- }
- });
+ final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
InetAddress remote = InetAddress.getByName("127.0.0.2");
@@ -131,8 +116,13 @@ public class ValidatorTest
Token min = tree.partitioner().getMinimumToken();
assertNotNull(tree.hash(new Range<>(min, min)));
- if (!lock.isSignaled())
- lock.await();
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertTrue(((ValidationComplete) m).success);
+ assertNotNull(((ValidationComplete) m).tree);
}
private static class CompactedRowStub extends AbstractCompactedRow
@@ -163,27 +153,91 @@ public class ValidatorTest
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
- final SimpleCondition lock = new SimpleCondition();
+ final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+
+ InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+ Validator validator = new Validator(desc, remote, 0);
+ validator.fail();
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertFalse(((ValidationComplete) m).success);
+ assertNull(((ValidationComplete) m).tree);
+ }
+
+ @Test
+ public void simpleValidationTest128() throws Exception
+ {
+ simpleValidationTest(128);
+ }
+
+ @Test
+ public void simpleValidationTest1500() throws Exception
+ {
+ simpleValidationTest(1500);
+ }
+
+ /**
+ * Test for CASSANDRA-5263
+ * 1. Create N rows
+ * 2. Run validation compaction
+ * 3. Expect merkle tree with size 2^(log2(n))
+ */
+ public void simpleValidationTest(int n) throws Exception
+ {
+ Keyspace ks = Keyspace.open(keyspace);
+ ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+ cfs.clearUnsafe();
+
+ // disable compaction while flushing
+ cfs.disableAutoCompaction();
+
+ CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+
+ cfs.forceBlockingFlush();
+ assertEquals(1, cfs.getSSTables().size());
+
+ // wait enough to force single compaction
+ TimeUnit.SECONDS.sleep(5);
+
+ SSTableReader sstable = cfs.getSSTables().iterator().next();
+ UUID repairSessionId = UUIDGen.getTimeUUID();
+ final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
+ cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
+ sstable.last.getToken()));
+
+ ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
+ Collections.singletonList(cfs), Collections.singleton(desc.range),
+ false, false);
+
+ final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+ Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+ CompactionManager.instance.submitValidation(cfs, validator);
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertTrue(((ValidationComplete) m).success);
+ MerkleTree tree = ((ValidationComplete) m).tree;
+
+ assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
+ assertEquals(tree.rowCount(), n);
+ }
+
+ private ListenableFuture<MessageOut> registerOutgoingMessageSink()
+ {
+ final SettableFuture<MessageOut> future = SettableFuture.create();
MessagingService.instance().addMessageSink(new IMessageSink()
{
- @SuppressWarnings("unchecked")
public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
{
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertFalse(((ValidationComplete) m).success);
- assertNull(((ValidationComplete) m).tree);
- }
- }
- finally
- {
- lock.signalAll();
- }
+ future.set(message);
return false;
}
@@ -192,13 +246,6 @@ public class ValidatorTest
return false;
}
});
-
- InetAddress remote = InetAddress.getByName("127.0.0.2");
-
- Validator validator = new Validator(desc, remote, 0);
- validator.fail();
-
- if (!lock.isSignaled())
- lock.await();
+ return future;
}
}
[03/10] cassandra git commit: Fix merkle tree depth calculation
Posted by yu...@apache.org.
Fix merkle tree depth calculation
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12580
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c70ce630
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c70ce630
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c70ce630
Branch: refs/heads/cassandra-3.X
Commit: c70ce6307da824529762ff40673642b6f86972aa
Parents: 2383935
Author: Paulo Motta <pa...@gmail.com>
Authored: Tue Aug 30 21:06:39 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:26:53 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../db/compaction/CompactionManager.java | 4 +-
.../org/apache/cassandra/repair/Validator.java | 9 +-
.../org/apache/cassandra/utils/MerkleTree.java | 10 ++
.../db/compaction/CompactionsTest.java | 2 +-
.../apache/cassandra/repair/ValidatorTest.java | 159 ++++++++++++-------
6 files changed, 125 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 998849e..97bc70a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Fix merkle tree depth calculation (CASSANDRA-12580)
* Make Collections deserialization more robust (CASSANDRA-12618)
@@ -36,7 +37,6 @@
* Don't write shadowed range tombstone (CASSANDRA-12030)
Merged from 2.1:
* Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
- * Fix queries with empty ByteBuffer values in clustering column restrictions (CASSANDRA-12127)
* Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828)
* Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
* cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index cf82498..78fa23c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1112,8 +1112,8 @@ public class CompactionManager implements CompactionManagerMBean
{
numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
}
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
- int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+ // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
+ int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
long start = System.nanoTime();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 4db1cfb..8dbb4cf 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -54,6 +54,7 @@ public class Validator implements Runnable
public final RepairJobDesc desc;
public final InetAddress initiator;
public final int gcBefore;
+ private final boolean evenTreeDistribution;
// null when all rows with the min token have been consumed
private long validated;
@@ -67,19 +68,25 @@ public class Validator implements Runnable
public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
{
+ this(desc, initiator, gcBefore, false);
+ }
+
+ public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution)
+ {
this.desc = desc;
this.initiator = initiator;
this.gcBefore = gcBefore;
validated = 0;
range = null;
ranges = null;
+ this.evenTreeDistribution = evenTreeDistribution;
}
public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
{
this.tree = tree;
- if (!tree.partitioner().preservesOrder())
+ if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
{
// You can't beat an even tree distribution for md5
tree.init();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 4fec62d..1e0f505 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -516,6 +516,16 @@ public class MerkleTree implements Serializable
return histbuild.buildWithStdevRangesAroundMean();
}
+ public long rowCount()
+ {
+ long count = 0;
+ for (TreeRange range : new TreeRangeIterator(this))
+ {
+ count += range.hashable.rowsInRange;
+ }
+ return count;
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 8ff3022..471f8cf 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -125,7 +125,7 @@ public class CompactionsTest
return store;
}
- private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+ public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
{
long timestamp = System.currentTimeMillis();
for (int i = startRowKey; i <= endRowKey; i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c70ce630/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index a9f18f5..61ab3da 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -20,8 +20,16 @@ package org.apache.cassandra.repair;
import java.io.IOException;
import java.net.InetAddress;
import java.security.MessageDigest;
+import java.util.Collections;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.CompactionsTest;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.SequentialWriter;
import org.junit.After;
import org.junit.BeforeClass;
@@ -46,15 +54,20 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.IMessageSink;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import static org.junit.Assert.*;
public class ValidatorTest
{
+ private static final long TEST_TIMEOUT = 60; //seconds
+
private static final String keyspace = "ValidatorTest";
private static final String columnFamily = "Standard1";
private final IPartitioner partitioner = StorageService.getPartitioner();
@@ -81,35 +94,7 @@ public class ValidatorTest
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
- final SimpleCondition lock = new SimpleCondition();
- MessagingService.instance().addMessageSink(new IMessageSink()
- {
- @SuppressWarnings("unchecked")
- public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
- {
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success);
- assertNotNull(((ValidationComplete) m).tree);
- }
- }
- finally
- {
- lock.signalAll();
- }
- return false;
- }
-
- public boolean allowIncomingMessage(MessageIn message, int id)
- {
- return false;
- }
- });
+ final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
InetAddress remote = InetAddress.getByName("127.0.0.2");
@@ -131,8 +116,13 @@ public class ValidatorTest
Token min = tree.partitioner().getMinimumToken();
assertNotNull(tree.hash(new Range<>(min, min)));
- if (!lock.isSignaled())
- lock.await();
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertTrue(((ValidationComplete) m).success);
+ assertNotNull(((ValidationComplete) m).tree);
}
private static class CompactedRowStub extends AbstractCompactedRow
@@ -163,27 +153,91 @@ public class ValidatorTest
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
- final SimpleCondition lock = new SimpleCondition();
+ final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+
+ InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+ Validator validator = new Validator(desc, remote, 0);
+ validator.fail();
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertFalse(((ValidationComplete) m).success);
+ assertNull(((ValidationComplete) m).tree);
+ }
+
+ @Test
+ public void simpleValidationTest128() throws Exception
+ {
+ simpleValidationTest(128);
+ }
+
+ @Test
+ public void simpleValidationTest1500() throws Exception
+ {
+ simpleValidationTest(1500);
+ }
+
+ /**
+ * Test for CASSANDRA-5263
+ * 1. Create N rows
+ * 2. Run validation compaction
+ * 3. Expect merkle tree with size 2^(log2(n))
+ */
+ public void simpleValidationTest(int n) throws Exception
+ {
+ Keyspace ks = Keyspace.open(keyspace);
+ ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+ cfs.clearUnsafe();
+
+ // disable compaction while flushing
+ cfs.disableAutoCompaction();
+
+ CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+
+ cfs.forceBlockingFlush();
+ assertEquals(1, cfs.getSSTables().size());
+
+ // wait enough to force single compaction
+ TimeUnit.SECONDS.sleep(5);
+
+ SSTableReader sstable = cfs.getSSTables().iterator().next();
+ UUID repairSessionId = UUIDGen.getTimeUUID();
+ final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
+ cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
+ sstable.last.getToken()));
+
+ ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
+ Collections.singletonList(cfs), Collections.singleton(desc.range),
+ false, false);
+
+ final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+ Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+ CompactionManager.instance.submitValidation(cfs, validator);
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertTrue(((ValidationComplete) m).success);
+ MerkleTree tree = ((ValidationComplete) m).tree;
+
+ assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
+ assertEquals(tree.rowCount(), n);
+ }
+
+ private ListenableFuture<MessageOut> registerOutgoingMessageSink()
+ {
+ final SettableFuture<MessageOut> future = SettableFuture.create();
MessagingService.instance().addMessageSink(new IMessageSink()
{
- @SuppressWarnings("unchecked")
public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
{
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertFalse(((ValidationComplete) m).success);
- assertNull(((ValidationComplete) m).tree);
- }
- }
- finally
- {
- lock.signalAll();
- }
+ future.set(message);
return false;
}
@@ -192,13 +246,6 @@ public class ValidatorTest
return false;
}
});
-
- InetAddress remote = InetAddress.getByName("127.0.0.2");
-
- Validator validator = new Validator(desc, remote, 0);
- validator.fail();
-
- if (!lock.isSignaled())
- lock.await();
+ return future;
}
}
[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.X
Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e3b34dc8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e3b34dc8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e3b34dc8
Branch: refs/heads/cassandra-3.X
Commit: e3b34dc8584373c9f503e830ff3241e0865ab994
Parents: 57e9a83 413e48e
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Sep 29 15:05:12 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 15:05:12 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 6 +-
.../org/apache/cassandra/repair/Validator.java | 9 +-
.../org/apache/cassandra/utils/MerkleTree.java | 10 ++
.../org/apache/cassandra/utils/MerkleTrees.java | 10 ++
.../db/compaction/CompactionsTest.java | 2 +-
.../apache/cassandra/repair/ValidatorTest.java | 167 ++++++++++++-------
7 files changed, 145 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c33b1d3,9076e7a..ba08745
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -91,59 -35,12 +91,60 @@@ Merged from 3.0
* Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
* Calculate last compacted key on startup (CASSANDRA-6216)
* Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
+ * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
+ * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
+Merged from 2.2:
++ * Fix merkle tree depth calculation (CASSANDRA-12580)
+ * Make Collections deserialization more robust (CASSANDRA-12618)
+ * Fix exceptions when enabling gossip on nodes that haven't joined the ring (CASSANDRA-12253)
+ * Fix authentication problem when invoking clqsh copy from a SOURCE command (CASSANDRA-12642)
+ * Decrement pending range calculator jobs counter in finally block
+ * cqlshlib tests: increase default execute timeout (CASSANDRA-12481)
+ * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)
+ * Fail repair on non-existing table (CASSANDRA-12279)
+ * Enable repair -pr and -local together (fix regression of CASSANDRA-7450) (CASSANDRA-12522)
+
+
+3.8, 3.9
+ * Fix value skipping with counter columns (CASSANDRA-11726)
+ * Fix nodetool tablestats miss SSTable count (CASSANDRA-12205)
+ * Fixed flacky SSTablesIteratedTest (CASSANDRA-12282)
+ * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
+ * cqlsh: Fix handling of $$-escaped strings (CASSANDRA-12189)
+ * Fix SSL JMX requiring truststore containing server cert (CASSANDRA-12109)
+ * RTE from new CDC column breaks in flight queries (CASSANDRA-12236)
+ * Fix hdr logging for single operation workloads (CASSANDRA-12145)
+ * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073)
+ * Increase size of flushExecutor thread pool (CASSANDRA-12071)
+ * Partial revert of CASSANDRA-11971, cannot recycle buffer in SP.sendMessagesToNonlocalDC (CASSANDRA-11950)
+ * Upgrade netty to 4.0.39 (CASSANDRA-12032, CASSANDRA-12034)
+ * Improve details in compaction log message (CASSANDRA-12080)
+ * Allow unset values in CQLSSTableWriter (CASSANDRA-11911)
+ * Chunk cache to request compressor-compatible buffers if pool space is exhausted (CASSANDRA-11993)
+ * Remove DatabaseDescriptor dependencies from SequentialWriter (CASSANDRA-11579)
+ * Move skip_stop_words filter before stemming (CASSANDRA-12078)
+ * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957)
+ * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002)
+ * When SEPWorker assigned work, set thread name to match pool (CASSANDRA-11966)
+ * Add cross-DC latency metrics (CASSANDRA-11596)
+ * Allow terms in selection clause (CASSANDRA-10783)
+ * Add bind variables to trace (CASSANDRA-11719)
+ * Switch counter shards' clock to timestamps (CASSANDRA-9811)
+ * Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853)
+ * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718)
+ * Support older ant versions (CASSANDRA-11807)
+ * Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623)
+ * cassandra-stress profiles should support case sensitive schemas (CASSANDRA-11546)
+ * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578)
+ * Faster streaming (CASSANDRA-9766)
+ * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425)
+ * Add repaired percentage metric (CASSANDRA-11503)
+ * Add Change-Data-Capture (CASSANDRA-8844)
+Merged from 3.0:
+ * Fix paging for 2.x to 3.x upgrades (CASSANDRA-11195)
* Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436)
* Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
- * Backport CASSANDRA-12002 (CASSANDRA-12177)
* Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
- * Fix potential bad messaging service message for paged range reads
- within mixed-version 3.x clusters (CASSANDRA-12249)
* Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
* NullPointerException during compaction on table with static columns (CASSANDRA-12336)
* Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/src/java/org/apache/cassandra/utils/MerkleTrees.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3b34dc8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/413e48e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/413e48e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/413e48e6
Branch: refs/heads/cassandra-3.X
Commit: 413e48e6571e3c23362d5053e0c7fcdd99bd1e7d
Parents: 5cebd1f c70ce63
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Sep 29 14:32:49 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:32:49 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../db/compaction/CompactionManager.java | 6 +-
.../org/apache/cassandra/repair/Validator.java | 9 +-
.../org/apache/cassandra/utils/MerkleTree.java | 10 ++
.../org/apache/cassandra/utils/MerkleTrees.java | 10 ++
.../db/compaction/CompactionsTest.java | 2 +-
.../apache/cassandra/repair/ValidatorTest.java | 167 ++++++++++++-------
7 files changed, 146 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f2f8dac,97bc70a..9076e7a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,77 -1,13 +1,78 @@@
-2.2.9
+3.0.10
+ * Fix failure in LogTransactionTest (CASSANDRA-12632)
+ * Fix potentially incomplete non-frozen UDT values when querying with the
+ full primary key specified (CASSANDRA-12605)
+ * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
+ * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
+ * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
+ * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
+ * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
+ * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
+Merged from 2.2:
+ * Fix merkle tree depth calculation (CASSANDRA-12580)
+ * Make Collections deserialization more robust (CASSANDRA-12618)
-
-
-2.2.8
* Fix exceptions when enabling gossip on nodes that haven't joined the ring (CASSANDRA-12253)
* Fix authentication problem when invoking clqsh copy from a SOURCE command (CASSANDRA-12642)
* Decrement pending range calculator jobs counter in finally block
(CASSANDRA-12554)
+Merged from 2.1:
- * Make Collections deserialization more robust (CASSANDRA-12618)
+ * Add system property to set the max number of native transport requests in queue (CASSANDRA-11363)
+
+
+3.0.9
+ * Handle composite prefixes with final EOC=0 as in 2.x and refactor LegacyLayout.decodeBound (CASSANDRA-12423)
+ * Fix paging for 2.x to 3.x upgrades (CASSANDRA-11195)
+ * select_distinct_with_deletions_test failing on non-vnode environments (CASSANDRA-11126)
+ * Stack Overflow returned to queries while upgrading (CASSANDRA-12527)
+ * Fix legacy regex for temporary files from 2.2 (CASSANDRA-12565)
+ * Add option to state current gc_grace_seconds to tools/bin/sstablemetadata (CASSANDRA-12208)
+ * Fix file system race condition that may cause LogAwareFileLister to fail to classify files (CASSANDRA-11889)
+ * Fix file handle leaks due to simultaneous compaction/repair and
+ listing snapshots, calculating snapshot sizes, or making schema
+ changes (CASSANDRA-11594)
+ * Fix nodetool repair exits with 0 for some errors (CASSANDRA-12508)
+ * Do not shut down BatchlogManager twice during drain (CASSANDRA-12504)
+ * Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
+ * Calculate last compacted key on startup (CASSANDRA-6216)
+ * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
+ * Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436)
+ * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
+ * Backport CASSANDRA-12002 (CASSANDRA-12177)
+ * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
+ * Fix potential bad messaging service message for paged range reads
+ within mixed-version 3.x clusters (CASSANDRA-12249)
+ * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
+ * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
+ * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
+ * Fix upgrade of super columns on thrift (CASSANDRA-12335)
+ * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
+ * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
+ * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
+ * Lost counter writes in compact table and static columns (CASSANDRA-12219)
+ * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
+ * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
+ * Add option to override compaction space check (CASSANDRA-12180)
+ * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
+ * Respond with v1/v2 protocol header when responding to driver that attempts
+ to connect with too low of a protocol version (CASSANDRA-11464)
+ * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
+ * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
+ * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
+ * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
+ * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
+ * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
+ * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
+ * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
+ * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
+ * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
+ * Fix column ordering of results with static columns for Thrift requests in
+ a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
+ those static columns in query results (CASSANDRA-12123)
+ * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
+ * Fix EOF exception when altering column type (CASSANDRA-11820)
+ * Fix JsonTransformer output of partition with deletion info (CASSANDRA-12418)
+ * Fix NPE in SSTableLoader when specifying partial directory path (CASSANDRA-12609)
+Merged from 2.2:
* Add local address entry in PropertyFileSnitch (CASSANDRA-11332)
* cqlshlib tests: increase default execute timeout (CASSANDRA-12481)
* Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 99e0fd5,78fa23c..4d1757e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1094,34 -1102,40 +1094,33 @@@ public class CompactionManager implemen
if (validator.gcBefore > 0)
gcBefore = validator.gcBefore;
else
- gcBefore = getDefaultGcBefore(cfs);
+ gcBefore = getDefaultGcBefore(cfs, nowInSec);
}
- // Create Merkle tree suitable to hold estimated partitions for given range.
- // We blindly assume that partition is evenly distributed on all sstables for now.
- long numPartitions = 0;
- for (SSTableReader sstable : sstables)
- {
- numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
- }
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263)
- int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
- MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
-
+ // Create Merkle trees suitable to hold estimated partitions for the given ranges.
+ // We blindly assume that a partition is evenly distributed on all sstables for now.
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
+ MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs);
long start = System.nanoTime();
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
+ ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore);
+ CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics))
{
- CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
- Iterator<AbstractCompactedRow> iter = ci.iterator();
- metrics.beginCompaction(ci);
- try
+ // validate the CF as we iterate over it
+ validator.prepare(cfs, tree);
+ while (ci.hasNext())
{
- // validate the CF as we iterate over it
- validator.prepare(cfs, tree);
- while (iter.hasNext())
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+ try (UnfilteredRowIterator partition = ci.next())
{
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
- AbstractCompactedRow row = iter.next();
- validator.add(row);
+ validator.add(partition);
}
- validator.complete();
}
- finally
+ validator.complete();
+ }
+ finally
+ {
+ if (isSnapshotValidation && !isGlobalSnapshotValidation)
{
// we can only clear the snapshot if we are not doing a global snapshot validation (we then clear it once anticompaction
// is done).
@@@ -1144,37 -1167,6 +1143,40 @@@
}
}
+ private static MerkleTrees createMerkleTrees(Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, ColumnFamilyStore cfs)
+ {
+ MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
+ long allPartitions = 0;
+ Map<Range<Token>, Long> rangePartitionCounts = new HashMap<>();
+ for (Range<Token> range : ranges)
+ {
+ long numPartitions = 0;
+ for (SSTableReader sstable : sstables)
+ numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(range));
+ rangePartitionCounts.put(range, numPartitions);
+ allPartitions += numPartitions;
+ }
+
+ for (Range<Token> range : ranges)
+ {
+ long numPartitions = rangePartitionCounts.get(range);
+ double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0;
++ // determine max tree depth proportional to range size to avoid blowing up memory with multiple tress,
++ // capping at 20 to prevent large tree (CASSANDRA-11390)
+ int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0;
- int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), maxDepth) : 0;
++ // determine tree depth from number of partitions, capping at max tree depth (CASSANDRA-5263)
++ int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0;
+ tree.addMerkleTree((int) Math.pow(2, depth), range);
+ }
+ if (logger.isDebugEnabled())
+ {
+ // MT serialize may take time
+ logger.debug("Created {} merkle trees with merkle trees size {}, {} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, MerkleTrees.serializer.serializedSize(tree, 0));
+ }
+
+ return tree;
+ }
+
private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
{
Refs<SSTableReader> sstables;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/Validator.java
index 217c9de,8dbb4cf..9baa358
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@@ -77,13 -79,14 +83,14 @@@ public class Validator implements Runna
validated = 0;
range = null;
ranges = null;
+ this.evenTreeDistribution = evenTreeDistribution;
}
- public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
+ public void prepare(ColumnFamilyStore cfs, MerkleTrees tree)
{
- this.tree = tree;
+ this.trees = tree;
- if (!tree.partitioner().preservesOrder())
+ if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
{
// You can't beat an even tree distribution for md5
tree.init();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTrees.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/MerkleTrees.java
index b950b3b,0000000..4ae55ab
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/utils/MerkleTrees.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTrees.java
@@@ -1,436 -1,0 +1,446 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.PeekingIterator;
+import org.slf4j.Logger;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+
+/**
+ * Wrapper class for handling of multiple MerkleTrees at once.
+ *
+ * The MerkleTree's are divided in Ranges of non-overlapping tokens.
+ */
+public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>>
+{
+ public static final MerkleTreesSerializer serializer = new MerkleTreesSerializer();
+
+ private Map<Range<Token>, MerkleTree> merkleTrees = new TreeMap<>(new TokenRangeComparator());
+
+ private IPartitioner partitioner;
+
+ /**
+ * Creates empty MerkleTrees object.
+ *
+ * @param partitioner The partitioner to use
+ */
+ public MerkleTrees(IPartitioner partitioner)
+ {
+ this(partitioner, new ArrayList<>());
+ }
+
+ private MerkleTrees(IPartitioner partitioner, Collection<MerkleTree> merkleTrees)
+ {
+ this.partitioner = partitioner;
+ addTrees(merkleTrees);
+ }
+
+ /**
+ * Get the ranges that these merkle trees covers.
+ *
+ * @return
+ */
+ public Collection<Range<Token>> ranges()
+ {
+ return merkleTrees.keySet();
+ }
+
+ /**
+ * Get the partitioner in use.
+ *
+ * @return
+ */
+ public IPartitioner partitioner()
+ {
+ return partitioner;
+ }
+
+ /**
+ * Add merkle tree's with the defined maxsize and ranges.
+ *
+ * @param maxsize
+ * @param ranges
+ */
+ public void addMerkleTrees(int maxsize, Collection<Range<Token>> ranges)
+ {
+ for (Range<Token> range : ranges)
+ {
+ addMerkleTree(maxsize, range);
+ }
+ }
+
+ /**
+ * Add a MerkleTree with the defined size and range.
+ *
+ * @param maxsize
+ * @param range
+ * @return The created merkle tree.
+ */
+ public MerkleTree addMerkleTree(int maxsize, Range<Token> range)
+ {
+ return addMerkleTree(maxsize, MerkleTree.RECOMMENDED_DEPTH, range);
+ }
+
+ @VisibleForTesting
+ public MerkleTree addMerkleTree(int maxsize, byte hashdepth, Range<Token> range)
+ {
+ MerkleTree tree = new MerkleTree(partitioner, range, hashdepth, maxsize);
+ addTree(tree);
+
+ return tree;
+ }
+
+ /**
+ * Get the MerkleTree.Range responsible for the given token.
+ *
+ * @param t
+ * @return
+ */
+ @VisibleForTesting
+ public MerkleTree.TreeRange get(Token t)
+ {
+ return getMerkleTree(t).get(t);
+ }
+
+ /**
+ * Init all MerkleTree's with an even tree distribution.
+ */
+ public void init()
+ {
+ for (Range<Token> range : merkleTrees.keySet())
+ {
+ init(range);
+ }
+ }
+
+ /**
+ * Init a selected MerkleTree with an even tree distribution.
+ *
+ * @param range
+ */
+ public void init(Range<Token> range)
+ {
+ merkleTrees.get(range).init();
+ }
+
+ /**
+ * Split the MerkleTree responsible for the given token.
+ *
+ * @param t
+ * @return
+ */
+ public boolean split(Token t)
+ {
+ return getMerkleTree(t).split(t);
+ }
+
+ /**
+ * Invalidate the MerkleTree responsible for the given token.
+ *
+ * @param t
+ */
+ @VisibleForTesting
+ public void invalidate(Token t)
+ {
+ getMerkleTree(t).invalidate(t);
+ }
+
+ /**
+ * Get the MerkleTree responsible for the given token range.
+ *
+ * @param range
+ * @return
+ */
+ public MerkleTree getMerkleTree(Range<Token> range)
+ {
+ return merkleTrees.get(range);
+ }
+
+ public long size()
+ {
+ long size = 0;
+
+ for (MerkleTree tree : merkleTrees.values())
+ {
+ size += tree.size();
+ }
+
+ return size;
+ }
+
+ @VisibleForTesting
+ public void maxsize(Range<Token> range, int maxsize)
+ {
+ getMerkleTree(range).maxsize(maxsize);
+ }
+
+ /**
+ * Get the MerkleTree responsible for the given token.
+ *
+ * @param t
+ * @return The given MerkleTree or null if none exist.
+ */
+ private MerkleTree getMerkleTree(Token t)
+ {
+ for (Range<Token> range : merkleTrees.keySet())
+ {
+ if (range.contains(t))
+ return merkleTrees.get(range);
+ }
+
+ throw new AssertionError("Expected tree for token " + t);
+ }
+
+ private void addTrees(Collection<MerkleTree> trees)
+ {
+ for (MerkleTree tree : trees)
+ {
+ addTree(tree);
+ }
+ }
+
+ private void addTree(MerkleTree tree)
+ {
+ assert validateNonOverlapping(tree) : "Range [" + tree.fullRange + "] is intersecting an existing range";
+
+ merkleTrees.put(tree.fullRange, tree);
+ }
+
+ private boolean validateNonOverlapping(MerkleTree tree)
+ {
+ for (Range<Token> range : merkleTrees.keySet())
+ {
+ if (tree.fullRange.intersects(range))
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Get an iterator for all the invalids generated by the MerkleTrees.
+ *
+ * @return
+ */
+ public TreeRangeIterator invalids()
+ {
+ return new TreeRangeIterator();
+ }
+
+ /**
+ * Log the row count per leaf for all MerkleTrees.
+ *
+ * @param logger
+ */
+ public void logRowCountPerLeaf(Logger logger)
+ {
+ for (MerkleTree tree : merkleTrees.values())
+ {
+ tree.histogramOfRowCountPerLeaf().log(logger);
+ }
+ }
+
+ /**
+ * Log the row size per leaf for all MerkleTrees.
+ *
+ * @param logger
+ */
+ public void logRowSizePerLeaf(Logger logger)
+ {
+ for (MerkleTree tree : merkleTrees.values())
+ {
+ tree.histogramOfRowSizePerLeaf().log(logger);
+ }
+ }
+
+ @VisibleForTesting
+ public byte[] hash(Range<Token> range)
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ boolean hashed = false;
+
+ try
+ {
+ for (Range<Token> rt : merkleTrees.keySet())
+ {
+ if (rt.intersects(range))
+ {
+ byte[] bytes = merkleTrees.get(rt).hash(range);
+ if (bytes != null)
+ {
+ baos.write(bytes);
+ hashed = true;
+ }
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to append merkle tree hash to result");
+ }
+
+ return hashed ? baos.toByteArray() : null;
+ }
+
+ /**
+ * Get an iterator of all ranges and their MerkleTrees.
+ */
+ public Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator()
+ {
+ return merkleTrees.entrySet().iterator();
+ }
+
++ public long rowCount()
++ {
++ long totalCount = 0;
++ for (MerkleTree tree : merkleTrees.values())
++ {
++ totalCount += tree.rowCount();
++ }
++ return totalCount;
++ }
++
+ public class TreeRangeIterator extends AbstractIterator<MerkleTree.TreeRange> implements
+ Iterable<MerkleTree.TreeRange>,
+ PeekingIterator<MerkleTree.TreeRange>
+ {
+ private final Iterator<MerkleTree> it;
+
+ private MerkleTree.TreeRangeIterator current = null;
+
+ private TreeRangeIterator()
+ {
+ it = merkleTrees.values().iterator();
+ }
+
+ public MerkleTree.TreeRange computeNext()
+ {
+ if (current == null || !current.hasNext())
+ return nextIterator();
+
+ return current.next();
+ }
+
+ private MerkleTree.TreeRange nextIterator()
+ {
+ if (it.hasNext())
+ {
+ current = it.next().invalids();
+
+ return current.next();
+ }
+
+ return endOfData();
+ }
+
+ public Iterator<MerkleTree.TreeRange> iterator()
+ {
+ return this;
+ }
+ }
+
+ /**
+ * Get the differences between the two sets of MerkleTrees.
+ *
+ * @param ltree
+ * @param rtree
+ * @return
+ */
+ public static List<Range<Token>> difference(MerkleTrees ltree, MerkleTrees rtree)
+ {
+ List<Range<Token>> differences = new ArrayList<>();
+ for (MerkleTree tree : ltree.merkleTrees.values())
+ {
+ differences.addAll(MerkleTree.difference(tree, rtree.getMerkleTree(tree.fullRange)));
+ }
+ return differences;
+ }
+
+ public static class MerkleTreesSerializer implements IVersionedSerializer<MerkleTrees>
+ {
+ public void serialize(MerkleTrees trees, DataOutputPlus out, int version) throws IOException
+ {
+ out.writeInt(trees.merkleTrees.size());
+ for (MerkleTree tree : trees.merkleTrees.values())
+ {
+ MerkleTree.serializer.serialize(tree, out, version);
+ }
+ }
+
+ public MerkleTrees deserialize(DataInputPlus in, int version) throws IOException
+ {
+ IPartitioner partitioner = null;
+ int nTrees = in.readInt();
+ Collection<MerkleTree> trees = new ArrayList<>(nTrees);
+ if (nTrees > 0)
+ {
+ for (int i = 0; i < nTrees; i++)
+ {
+ MerkleTree tree = MerkleTree.serializer.deserialize(in, version);
+ trees.add(tree);
+
+ if (partitioner == null)
+ partitioner = tree.partitioner();
+ else
+ assert tree.partitioner() == partitioner;
+ }
+ }
+
+ return new MerkleTrees(partitioner, trees);
+ }
+
+ public long serializedSize(MerkleTrees trees, int version)
+ {
+ assert trees != null;
+
+ long size = TypeSizes.sizeof(trees.merkleTrees.size());
+ for (MerkleTree tree : trees.merkleTrees.values())
+ {
+ size += MerkleTree.serializer.serializedSize(tree, version);
+ }
+ return size;
+ }
+
+ }
+
+ private static class TokenRangeComparator implements Comparator<Range<Token>>
+ {
+ @Override
+ public int compare(Range<Token> rt1, Range<Token> rt2)
+ {
+ if (rt1.left.compareTo(rt2.left) == 0)
+ return 0;
+
+ return rt1.compareTo(rt2);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 198b01b,471f8cf..0ce81d3
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@@ -115,10 -125,9 +115,10 @@@ public class CompactionsTes
return store;
}
- private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
+ public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl)
{
long timestamp = System.currentTimeMillis();
+ CFMetaData cfm = Keyspace.open(ks).getColumnFamilyStore(cf).metadata;
for (int i = startRowKey; i <= endRowKey; i++)
{
DecoratedKey key = Util.dk(Integer.toString(i));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 14f5707,61ab3da..9c32cef
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@@ -15,13 -15,22 +15,23 @@@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.repair;
-import java.io.IOException;
import java.net.InetAddress;
-import java.security.MessageDigest;
+import java.util.Arrays;
+ import java.util.Collections;
++import java.util.Iterator;
++import java.util.Map;
import java.util.UUID;
++import java.util.concurrent.CompletableFuture;
+ import java.util.concurrent.TimeUnit;
+
+ import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.SettableFuture;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.compaction.CompactionsTest;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.SequentialWriter;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
@@@ -39,24 -51,26 +49,29 @@@ import org.apache.cassandra.net.IMessag
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.IMessageSink;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
- import org.apache.cassandra.utils.concurrent.SimpleCondition;
++import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.concurrent.SimpleCondition;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class ValidatorTest
{
+ private static final long TEST_TIMEOUT = 60; //seconds
+
private static final String keyspace = "ValidatorTest";
private static final String columnFamily = "Standard1";
- private final IPartitioner partitioner = StorageService.getPartitioner();
+ private static IPartitioner partitioner;
@BeforeClass
public static void defineSchema() throws Exception
@@@ -78,36 -92,9 +93,9 @@@
public void testValidatorComplete() throws Throwable
{
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
- final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
+ final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
- final SimpleCondition lock = new SimpleCondition();
- MessagingService.instance().addMessageSink(new IMessageSink()
- {
- public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
- {
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success());
- assertNotNull(((ValidationComplete) m).trees);
- }
- }
- finally
- {
- lock.signalAll();
- }
- return false;
- }
-
- public boolean allowIncomingMessage(MessageIn message, int id)
- {
- return false;
- }
- });
- final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++ final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
InetAddress remote = InetAddress.getByName("127.0.0.2");
@@@ -130,37 -116,128 +118,111 @@@
Token min = tree.partitioner().getMinimumToken();
assertNotNull(tree.hash(new Range<>(min, min)));
- if (!lock.isSignaled())
- lock.await();
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success);
- assertNotNull(((ValidationComplete) m).tree);
++ assertTrue(((ValidationComplete) m).success());
++ assertNotNull(((ValidationComplete) m).trees);
}
- private static class CompactedRowStub extends AbstractCompactedRow
- {
- private CompactedRowStub(DecoratedKey key)
- {
- super(key);
- }
-
- public RowIndexEntry write(long currentPosition, SequentialWriter out) throws IOException
- {
- throw new UnsupportedOperationException();
- }
-
- public void update(MessageDigest digest) { }
-
- public ColumnStats columnStats()
- {
- throw new UnsupportedOperationException();
- }
-
- public void close() throws IOException { }
- }
@Test
public void testValidatorFailed() throws Throwable
{
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
- final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
+ final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
- final SimpleCondition lock = new SimpleCondition();
- final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++ final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+
+ InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+ Validator validator = new Validator(desc, remote, 0);
+ validator.fail();
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
- assertFalse(((ValidationComplete) m).success);
- assertNull(((ValidationComplete) m).tree);
++ assertFalse(((ValidationComplete) m).success());
++ assertNull(((ValidationComplete) m).trees);
+ }
+
+ @Test
+ public void simpleValidationTest128() throws Exception
+ {
+ simpleValidationTest(128);
+ }
+
+ @Test
+ public void simpleValidationTest1500() throws Exception
+ {
+ simpleValidationTest(1500);
+ }
+
+ /**
+ * Test for CASSANDRA-5263
+ * 1. Create N rows
+ * 2. Run validation compaction
+ * 3. Expect merkle tree with size 2^(log2(n))
+ */
+ public void simpleValidationTest(int n) throws Exception
+ {
+ Keyspace ks = Keyspace.open(keyspace);
+ ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+ cfs.clearUnsafe();
+
+ // disable compaction while flushing
+ cfs.disableAutoCompaction();
+
+ CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+
+ cfs.forceBlockingFlush();
- assertEquals(1, cfs.getSSTables().size());
++ assertEquals(1, cfs.getLiveSSTables().size());
+
+ // wait enough to force single compaction
+ TimeUnit.SECONDS.sleep(5);
+
- SSTableReader sstable = cfs.getSSTables().iterator().next();
++ SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+ UUID repairSessionId = UUIDGen.getTimeUUID();
+ final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
- cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(),
- sstable.last.getToken()));
++ cfs.getColumnFamilyName(), Collections.singletonList(new Range<>(sstable.first.getToken(),
++ sstable.last.getToken())));
+
+ ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
- Collections.singletonList(cfs), Collections.singleton(desc.range),
- false, false);
++ Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
++ false);
+
- final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
++ final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
+ Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true);
+ CompactionManager.instance.submitValidation(cfs, validator);
+
+ MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success);
- MerkleTree tree = ((ValidationComplete) m).tree;
++ assertTrue(((ValidationComplete) m).success());
++ MerkleTrees trees = ((ValidationComplete) m).trees;
+
- assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0);
- assertEquals(tree.rowCount(), n);
++ Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = trees.iterator();
++ while (iterator.hasNext())
++ {
++ assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), iterator.next().getValue().size(), 0.0);
++ }
++ assertEquals(trees.rowCount(), n);
+ }
+
- private ListenableFuture<MessageOut> registerOutgoingMessageSink()
++ private CompletableFuture<MessageOut> registerOutgoingMessageSink()
+ {
- final SettableFuture<MessageOut> future = SettableFuture.create();
++ final CompletableFuture<MessageOut> future = new CompletableFuture<>();
MessagingService.instance().addMessageSink(new IMessageSink()
{
public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
{
- try
- {
- if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
- {
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
- assertEquals(desc, m.desc);
- assertFalse(((ValidationComplete) m).success());
- assertNull(((ValidationComplete) m).trees);
- }
- }
- finally
- {
- lock.signalAll();
- }
- future.set(message);
++ future.complete(message);
return false;
}
[10/10] cassandra git commit: Merge branch 'cassandra-3.X' into trunk
Posted by yu...@apache.org.
Merge branch 'cassandra-3.X' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/87825f82
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/87825f82
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/87825f82
Branch: refs/heads/trunk
Commit: 87825f8208fc21000a841f724c7f89f3702d8e70
Parents: c18968b e3b34dc
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Sep 29 15:05:25 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 15:05:25 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 6 +-
.../org/apache/cassandra/repair/Validator.java | 9 +-
.../org/apache/cassandra/utils/MerkleTree.java | 10 ++
.../org/apache/cassandra/utils/MerkleTrees.java | 10 ++
.../db/compaction/CompactionsTest.java | 2 +-
.../apache/cassandra/repair/ValidatorTest.java | 167 ++++++++++++-------
7 files changed, 145 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/87825f82/CHANGES.txt
----------------------------------------------------------------------