You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2015/08/06 15:29:18 UTC
[4/5] cassandra git commit: Repair common subranges of a set of nodes
in one session reviewed by Stefania Alborghetti for CASSANDRA-5220
Repair common subranges of a set of nodes in one session
reviewed by Stefania Alborghetti for CASSANDRA-5220
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0dd50a6c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0dd50a6c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0dd50a6c
Branch: refs/heads/trunk
Commit: 0dd50a6cdc81ec9ff1367238876d476affcf60e2
Parents: bf47408
Author: Marcus Olsson <ma...@ericsson.com>
Authored: Thu Aug 6 08:23:10 2015 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Aug 6 08:23:20 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 19 +-
.../compaction/CompactionStrategyManager.java | 24 +-
.../apache/cassandra/net/MessagingService.java | 6 +
.../org/apache/cassandra/repair/RepairJob.java | 17 +-
.../apache/cassandra/repair/RepairJobDesc.java | 41 +-
.../repair/RepairMessageVerbHandler.java | 4 +-
.../apache/cassandra/repair/RepairRunnable.java | 45 +-
.../apache/cassandra/repair/RepairSession.java | 48 +-
.../cassandra/repair/RepairSessionResult.java | 6 +-
.../org/apache/cassandra/repair/SyncTask.java | 6 +-
.../repair/SystemDistributedKeyspace.java | 27 +-
.../apache/cassandra/repair/TreeResponse.java | 8 +-
.../apache/cassandra/repair/ValidationTask.java | 14 +-
.../org/apache/cassandra/repair/Validator.java | 83 +--
.../repair/messages/ValidationComplete.java | 47 +-
.../cassandra/service/ActiveRepairService.java | 4 +-
.../org/apache/cassandra/utils/MerkleTrees.java | 434 +++++++++++++++
.../serialization/3.0/gms.EndpointState.bin | Bin 0 -> 73 bytes
test/data/serialization/3.0/gms.Gossip.bin | Bin 0 -> 158 bytes
.../serialization/3.0/service.SyncComplete.bin | Bin 0 -> 362 bytes
.../serialization/3.0/service.SyncRequest.bin | Bin 0 -> 219 bytes
.../3.0/service.ValidationComplete.bin | Bin 0 -> 1251 bytes
.../3.0/service.ValidationRequest.bin | Bin 0 -> 167 bytes
.../cassandra/AbstractSerializationsTester.java | 3 +-
.../LeveledCompactionStrategyTest.java | 2 +-
.../cassandra/repair/LocalSyncTaskTest.java | 23 +-
.../cassandra/repair/RepairSessionTest.java | 3 +-
.../apache/cassandra/repair/ValidatorTest.java | 19 +-
.../cassandra/service/SerializationsTest.java | 28 +-
.../apache/cassandra/utils/MerkleTreeTest.java | 1 -
.../apache/cassandra/utils/MerkleTreesTest.java | 538 +++++++++++++++++++
32 files changed, 1256 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1d1ad0f..80e0e50 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-beta1
+ * Repair improvements when using vnodes (CASSANDRA-5220)
* Disable scripted UDFs by default (CASSANDRA-9889)
* Add transparent data encryption core classes (CASSANDRA-9945)
* Bytecode inspection for Java-UDFs (CASSANDRA-9890)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 92cc249..8aa16d5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -69,6 +69,8 @@ import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.repair.Validator;
@@ -76,7 +78,7 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.UUIDGen;
@@ -1045,7 +1047,7 @@ public class CompactionManager implements CompactionManagerMBean
for (SSTableReader sstable : sstableCandidates.sstables)
{
- if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
+ if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(validator.desc.ranges))
{
sstablesToValidate.add(sstable);
}
@@ -1074,19 +1076,20 @@ public class CompactionManager implements CompactionManagerMBean
gcBefore = getDefaultGcBefore(cfs, nowInSec);
}
- // Create Merkle tree suitable to hold estimated partitions for given range.
- // We blindly assume that partition is evenly distributed on all sstables for now.
+ // Create Merkle trees suitable to hold estimated partitions for the given ranges.
+ // We blindly assume that a partition is evenly distributed on all sstables for now.
long numPartitions = 0;
for (SSTableReader sstable : sstables)
{
- numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
+ numPartitions += sstable.estimatedKeysForRanges(validator.desc.ranges);
}
// determine tree depth from number of partitions, but cap at 20 to prevent large tree.
int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
- MerkleTree tree = new MerkleTree(cfs.getPartitioner(), validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+ MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
+ tree.addMerkleTrees((int) Math.pow(2, depth), validator.desc.ranges);
long start = System.nanoTime();
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.range);
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore);
CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics))
{
@@ -1119,7 +1122,7 @@ public class CompactionManager implements CompactionManagerMBean
duration,
depth,
numPartitions,
- MerkleTree.serializer.serializedSize(tree, 0),
+ MerkleTrees.serializer.serializedSize(tree, 0),
validator.desc);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index e5aff5d..4f6dfa2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
import java.util.concurrent.Callable;
+import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -340,7 +341,7 @@ public class CompactionStrategyManager implements INotificationConsumer
* @return
*/
@SuppressWarnings("resource")
- public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+ public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
{
List<SSTableReader> repairedSSTables = new ArrayList<>();
List<SSTableReader> unrepairedSSTables = new ArrayList<>();
@@ -352,19 +353,26 @@ public class CompactionStrategyManager implements INotificationConsumer
unrepairedSSTables.add(sstable);
}
+ Set<ISSTableScanner> scanners = new HashSet<>(sstables.size());
- AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
- AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
+ for (Range<Token> range : ranges)
+ {
+ AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
+ AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
+
+ for (ISSTableScanner scanner : Iterables.concat(repairedScanners.scanners, unrepairedScanners.scanners))
+ {
+ if (!scanners.add(scanner))
+ scanner.close();
+ }
+ }
- List<ISSTableScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size());
- scanners.addAll(repairedScanners.scanners);
- scanners.addAll(unrepairedScanners.scanners);
- return new AbstractCompactionStrategy.ScannerList(scanners);
+ return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners));
}
public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
{
- return getScanners(sstables, null);
+ return getScanners(sstables, Collections.singleton(null));
}
public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 422fdb3..e10b4cb 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -1111,6 +1111,12 @@ public final class MessagingService implements MessagingServiceMBean
return StorageService.instance.getTokenMetadata().partitioner;
}
+ public static void validatePartitioner(Collection<? extends AbstractBounds<?>> allBounds)
+ {
+ for (AbstractBounds<?> bounds : allBounds)
+ validatePartitioner(bounds);
+ }
+
public static void validatePartitioner(AbstractBounds<?> bounds)
{
if (globalPartitioner() != bounds.left.getPartitioner())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index ac20428..1e54f88 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -48,21 +48,14 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
*
* @param session RepairSession that this RepairJob belongs
* @param columnFamily name of the ColumnFamily to repair
- * @param parallelismDegree how to run repair job in parallel
- * @param repairedAt when the repair occurred (millis)
- * @param taskExecutor Executor to run various repair tasks
*/
- public RepairJob(RepairSession session,
- String columnFamily,
- RepairParallelism parallelismDegree,
- long repairedAt,
- ListeningExecutorService taskExecutor)
+ public RepairJob(RepairSession session, String columnFamily)
{
this.session = session;
- this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRange());
- this.repairedAt = repairedAt;
- this.taskExecutor = taskExecutor;
- this.parallelismDegree = parallelismDegree;
+ this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges());
+ this.repairedAt = session.repairedAt;
+ this.taskExecutor = session.taskExecutor;
+ this.parallelismDegree = session.parallelismDegree;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairJobDesc.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
index 1dd67c7..05adbf9 100644
--- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java
+++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.repair;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.UUID;
import com.google.common.base.Objects;
@@ -47,21 +49,21 @@ public class RepairJobDesc
public final String keyspace;
public final String columnFamily;
/** repairing range */
- public final Range<Token> range;
+ public final Collection<Range<Token>> ranges;
- public RepairJobDesc(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range)
+ public RepairJobDesc(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Collection<Range<Token>> ranges)
{
this.parentSessionId = parentSessionId;
this.sessionId = sessionId;
this.keyspace = keyspace;
this.columnFamily = columnFamily;
- this.range = range;
+ this.ranges = ranges;
}
@Override
public String toString()
{
- return "[repair #" + sessionId + " on " + keyspace + "/" + columnFamily + ", " + range + "]";
+ return "[repair #" + sessionId + " on " + keyspace + "/" + columnFamily + ", " + ranges + "]";
}
@Override
@@ -74,7 +76,7 @@ public class RepairJobDesc
if (!columnFamily.equals(that.columnFamily)) return false;
if (!keyspace.equals(that.keyspace)) return false;
- if (range != null ? !range.equals(that.range) : that.range != null) return false;
+ if (ranges != null ? that.ranges == null || (ranges.size() != that.ranges.size()) || (ranges.size() == that.ranges.size() && !ranges.containsAll(that.ranges)) : that.ranges != null) return false;
if (!sessionId.equals(that.sessionId)) return false;
if (parentSessionId != null ? !parentSessionId.equals(that.parentSessionId) : that.parentSessionId != null) return false;
@@ -84,7 +86,7 @@ public class RepairJobDesc
@Override
public int hashCode()
{
- return Objects.hashCode(sessionId, keyspace, columnFamily, range);
+ return Objects.hashCode(sessionId, keyspace, columnFamily, ranges);
}
private static class RepairJobDescSerializer implements IVersionedSerializer<RepairJobDesc>
@@ -100,8 +102,10 @@ public class RepairJobDesc
UUIDSerializer.serializer.serialize(desc.sessionId, out, version);
out.writeUTF(desc.keyspace);
out.writeUTF(desc.columnFamily);
- MessagingService.validatePartitioner(desc.range);
- AbstractBounds.tokenSerializer.serialize(desc.range, out, version);
+ MessagingService.validatePartitioner(desc.ranges);
+ out.writeInt(desc.ranges.size());
+ for (Range<Token> rt : desc.ranges)
+ AbstractBounds.tokenSerializer.serialize(rt, out, version);
}
public RepairJobDesc deserialize(DataInputPlus in, int version) throws IOException
@@ -115,8 +119,19 @@ public class RepairJobDesc
UUID sessionId = UUIDSerializer.serializer.deserialize(in, version);
String keyspace = in.readUTF();
String columnFamily = in.readUTF();
- Range<Token> range = (Range<Token>)AbstractBounds.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), version);
- return new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range);
+
+ int nRanges = in.readInt();
+ Collection<Range<Token>> ranges = new ArrayList<>();
+ Range<Token> range;
+
+ for (int i = 0; i < nRanges; i++)
+ {
+ range = (Range<Token>) AbstractBounds.tokenSerializer.deserialize(in,
+ MessagingService.globalPartitioner(), version);
+ ranges.add(range);
+ }
+
+ return new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, ranges);
}
public long serializedSize(RepairJobDesc desc, int version)
@@ -131,7 +146,11 @@ public class RepairJobDesc
size += UUIDSerializer.serializer.serializedSize(desc.sessionId, version);
size += TypeSizes.sizeof(desc.keyspace);
size += TypeSizes.sizeof(desc.columnFamily);
- size += AbstractBounds.tokenSerializer.serializedSize(desc.range, version);
+ size += TypeSizes.sizeof(desc.ranges.size());
+ for (Range<Token> rt : desc.ranges)
+ {
+ size += AbstractBounds.tokenSerializer.serializedSize(rt, version);
+ }
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index d765ae6..28a3bf5 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -79,14 +79,14 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
case SNAPSHOT:
logger.debug("Snapshotting {}", desc);
ColumnFamilyStore cfs = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
- final Range<Token> repairingRange = desc.range;
+ final Collection<Range<Token>> repairingRange = desc.ranges;
Set<SSTableReader> snapshottedSSSTables = cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
{
public boolean apply(SSTableReader sstable)
{
return sstable != null &&
!sstable.metadata.isIndex() && // exclude SSTables from 2i
- new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
+ new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(repairingRange);
}
}, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 28511db..9401c03 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.progress.ProgressEvent;
@@ -146,17 +147,19 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
}
final Set<InetAddress> allNeighbors = new HashSet<>();
- Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
+ List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges = new ArrayList<>();
try
{
for (Range<Token> range : options.getRanges())
{
- Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range,
- options.getDataCenters(),
- options.getHosts());
- rangeToNeighbors.put(range, neighbors);
- allNeighbors.addAll(neighbors);
+ Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range,
+ options.getDataCenters(),
+ options.getHosts());
+
+ addRangeToNeighbors(commonRanges, range, neighbors);
+ allNeighbors.addAll(neighbors);
}
+
progress.incrementAndGet();
}
catch (IllegalArgumentException e)
@@ -210,13 +213,13 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
"internal"));
List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
- for (Range<Token> range : options.getRanges())
+ for (Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p : commonRanges)
{
final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
- range,
+ p.right,
keyspace,
options.getParallelism(),
- rangeToNeighbors.get(range),
+ p.left,
repairedAt,
executor,
cfnames);
@@ -228,7 +231,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
public void onSuccess(RepairSessionResult result)
{
String message = String.format("Repair session %s for range %s finished", session.getId(),
- session.getRange().toString());
+ session.getRanges().toString());
logger.info(message);
fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
progress.incrementAndGet(),
@@ -239,7 +242,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
public void onFailure(Throwable t)
{
String message = String.format("Repair session %s for range %s failed with error %s",
- session.getId(), session.getRange().toString(), t.getMessage());
+ session.getId(), session.getRanges().toString(), t.getMessage());
logger.error(message, t);
fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
progress.incrementAndGet(),
@@ -265,7 +268,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
{
if (sessionResult != null)
{
- successfulRanges.add(sessionResult.range);
+ successfulRanges.addAll(sessionResult.ranges);
}
else
{
@@ -325,6 +328,24 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
});
}
+ private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors)
+ {
+ for (int i = 0; i < neighborRangeList.size(); i++)
+ {
+ Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p = neighborRangeList.get(i);
+
+ if (p.left.containsAll(neighbors))
+ {
+ p.right.add(range);
+ return;
+ }
+ }
+
+ List<Range<Token>> ranges = new ArrayList<>();
+ ranges.add(range);
+ neighborRangeList.add(Pair.create(neighbors, ranges));
+ }
+
private Thread createQueryThread(final int cmd, final UUID sessionId)
{
return new Thread(new WrappedRunnable()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index a2dcdd1..a52b352 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -37,13 +37,13 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
import org.apache.cassandra.utils.Pair;
/**
- * Coordinates the (active) repair of a token range.
+ * Coordinates the (active) repair of a list of non overlapping token ranges.
*
- * A given RepairSession repairs a set of replicas for a given range on a list
+ * A given RepairSession repairs a set of replicas for a given set of ranges on a list
* of column families. For each of the column family to repair, RepairSession
* creates a {@link RepairJob} that handles the repair of that CF.
*
@@ -64,11 +64,11 @@ import org.apache.cassandra.utils.Pair;
* A given session will execute the first phase (validation phase) of each of it's job
* sequentially. In other words, it will start the first job and only start the next one
* once that first job validation phase is complete. This is done so that the replica only
- * create one merkle tree at a time, which is our way to ensure that such creation starts
+ * create one merkle tree per range at a time, which is our way to ensure that such creation starts
* roughly at the same time on every node (see CASSANDRA-2816). However the synchronization
* phases are allowed to run concurrently (with each other and with validation phases).
*
- * A given RepairJob has 2 modes: either sequential or not (isSequential flag). If sequential,
+ * A given RepairJob has 2 modes: either sequential or not (RepairParallelism). If sequential,
* it will requests merkle tree creation from each replica in sequence (though in that case
* we still first send a message to each node to flush and snapshot data so each merkle tree
* creation is still done on similar data, even if the actual creation is not
@@ -88,9 +88,9 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
private final String[] cfnames;
public final RepairParallelism parallelismDegree;
/** Range to repair */
- public final Range<Token> range;
+ public final Collection<Range<Token>> ranges;
public final Set<InetAddress> endpoints;
- private final long repairedAt;
+ public final long repairedAt;
// number of validations left to be performed
private final AtomicInteger validationRemaining;
@@ -103,7 +103,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
// Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
- private final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask"));
+ public final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask"));
private volatile boolean terminated = false;
@@ -112,7 +112,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
*
* @param parentRepairSession the parent sessions id
* @param id this sessions id
- * @param range range to repair
+ * @param ranges ranges to repair
* @param keyspace name of keyspace
* @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees
* @param endpoints the data centers that should be part of the repair; null for all DCs
@@ -121,7 +121,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
*/
public RepairSession(UUID parentRepairSession,
UUID id,
- Range<Token> range,
+ Collection<Range<Token>> ranges,
String keyspace,
RepairParallelism parallelismDegree,
Set<InetAddress> endpoints,
@@ -135,7 +135,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
this.parallelismDegree = parallelismDegree;
this.keyspace = keyspace;
this.cfnames = cfnames;
- this.range = range;
+ this.ranges = ranges;
this.endpoints = endpoints;
this.repairedAt = repairedAt;
this.validationRemaining = new AtomicInteger(cfnames.length);
@@ -146,9 +146,9 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
return id;
}
- public Range<Token> getRange()
+ public Collection<Range<Token>> getRanges()
{
- return range;
+ return ranges;
}
public void waitForValidation(Pair<RepairJobDesc, InetAddress> key, ValidationTask task)
@@ -166,9 +166,9 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
*
* @param desc repair job description
* @param endpoint endpoint that sent merkle tree
- * @param tree calculated merkle tree, or null if validation failed
+ * @param trees calculated merkle trees, or null if validation failed
*/
- public void validationComplete(RepairJobDesc desc, InetAddress endpoint, MerkleTree tree)
+ public void validationComplete(RepairJobDesc desc, InetAddress endpoint, MerkleTrees trees)
{
ValidationTask task = validating.remove(Pair.create(desc, endpoint));
if (task == null)
@@ -180,7 +180,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
String message = String.format("Received merkle tree for %s from %s", desc.columnFamily, endpoint);
logger.info("[repair #{}] {}", getId(), message);
Tracing.traceRepair(message);
- task.treeReceived(tree);
+ task.treesReceived(trees);
// Unregister from FailureDetector once we've completed synchronizing Merkle trees.
// After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
@@ -234,15 +234,15 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
if (terminated)
return;
- logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getId(), repairedNodes(), range, keyspace, Arrays.toString(cfnames)));
- Tracing.traceRepair("Syncing range {}", range);
- SystemDistributedKeyspace.startRepairs(getId(), parentRepairSession, keyspace, cfnames, range, endpoints);
+ logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getId(), repairedNodes(), ranges, keyspace, Arrays.toString(cfnames)));
+ Tracing.traceRepair("Syncing range {}", ranges);
+ SystemDistributedKeyspace.startRepairs(getId(), parentRepairSession, keyspace, cfnames, ranges, endpoints);
if (endpoints.isEmpty())
{
- logger.info("[repair #{}] {}", getId(), message = String.format("No neighbors to repair with on range %s: session completed", range));
+ logger.info("[repair #{}] {}", getId(), message = String.format("No neighbors to repair with on range %s: session completed", ranges));
Tracing.traceRepair(message);
- set(new RepairSessionResult(id, keyspace, range, Lists.<RepairResult>newArrayList()));
+ set(new RepairSessionResult(id, keyspace, ranges, Lists.<RepairResult>newArrayList()));
SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, new RuntimeException(message));
return;
}
@@ -265,7 +265,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
List<ListenableFuture<RepairResult>> jobs = new ArrayList<>(cfnames.length);
for (String cfname : cfnames)
{
- RepairJob job = new RepairJob(this, cfname, parallelismDegree, repairedAt, taskExecutor);
+ RepairJob job = new RepairJob(this, cfname);
executor.execute(job);
jobs.add(job);
}
@@ -277,8 +277,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
{
// this repair session is completed
logger.info("[repair #{}] {}", getId(), "Session completed successfully");
- Tracing.traceRepair("Completed sync of range {}", range);
- set(new RepairSessionResult(id, keyspace, range, results));
+ Tracing.traceRepair("Completed sync of range {}", ranges);
+ set(new RepairSessionResult(id, keyspace, ranges, results));
taskExecutor.shutdown();
// mark this session as terminated
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairSessionResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSessionResult.java b/src/java/org/apache/cassandra/repair/RepairSessionResult.java
index 4551608..d4fff37 100644
--- a/src/java/org/apache/cassandra/repair/RepairSessionResult.java
+++ b/src/java/org/apache/cassandra/repair/RepairSessionResult.java
@@ -30,14 +30,14 @@ public class RepairSessionResult
{
public final UUID sessionId;
public final String keyspace;
- public final Range<Token> range;
+ public final Collection<Range<Token>> ranges;
public final Collection<RepairResult> repairJobResults;
- public RepairSessionResult(UUID sessionId, String keyspace, Range<Token> range, Collection<RepairResult> repairJobResults)
+ public RepairSessionResult(UUID sessionId, String keyspace, Collection<Range<Token>> ranges, Collection<RepairResult> repairJobResults)
{
this.sessionId = sessionId;
this.keyspace = keyspace;
- this.range = range;
+ this.ranges = ranges;
this.repairJobResults = repairJobResults;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/SyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java b/src/java/org/apache/cassandra/repair/SyncTask.java
index 7350a66..8adec6f 100644
--- a/src/java/org/apache/cassandra/repair/SyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SyncTask.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.repair;
-import java.util.ArrayList;
import java.util.List;
import com.google.common.util.concurrent.AbstractFuture;
@@ -27,7 +26,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
/**
* SyncTask will calculate the difference of MerkleTree between two nodes
@@ -56,8 +55,7 @@ public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runna
public void run()
{
// compare trees, and collect differences
- List<Range<Token>> differences = new ArrayList<>();
- differences.addAll(MerkleTree.difference(r1.tree, r2.tree));
+ List<Range<Token>> differences = MerkleTrees.difference(r1.trees, r2.trees);
stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), differences.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index 70e74db..9cf6c3e 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -130,7 +130,7 @@ public final class SystemDistributedKeyspace
processSilent(fmtQuery);
}
- public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Range<Token> range, Iterable<InetAddress> endpoints)
+ public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges, Iterable<InetAddress> endpoints)
{
String coordinator = FBUtilities.getBroadcastAddress().getHostAddress();
Set<String> participants = Sets.newHashSet(coordinator);
@@ -144,17 +144,20 @@ public final class SystemDistributedKeyspace
for (String cfname : cfnames)
{
- String fmtQry = String.format(query, NAME, REPAIR_HISTORY,
- keyspaceName,
- cfname,
- id.toString(),
- parent_id.toString(),
- range.left.toString(),
- range.right.toString(),
- coordinator,
- Joiner.on("', '").join(participants),
- RepairState.STARTED.toString());
- processSilent(fmtQry);
+ for (Range<Token> range : ranges)
+ {
+ String fmtQry = String.format(query, NAME, REPAIR_HISTORY,
+ keyspaceName,
+ cfname,
+ id.toString(),
+ parent_id.toString(),
+ range.left.toString(),
+ range.right.toString(),
+ coordinator,
+ Joiner.on("', '").join(participants),
+ RepairState.STARTED.toString());
+ processSilent(fmtQry);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/TreeResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/TreeResponse.java b/src/java/org/apache/cassandra/repair/TreeResponse.java
index eede4ee..c898b36 100644
--- a/src/java/org/apache/cassandra/repair/TreeResponse.java
+++ b/src/java/org/apache/cassandra/repair/TreeResponse.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.repair;
import java.net.InetAddress;
-import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
/**
* Merkle tree response sent from given endpoint.
@@ -27,11 +27,11 @@ import org.apache.cassandra.utils.MerkleTree;
public class TreeResponse
{
public final InetAddress endpoint;
- public final MerkleTree tree;
+ public final MerkleTrees trees;
- public TreeResponse(InetAddress endpoint, MerkleTree tree)
+ public TreeResponse(InetAddress endpoint, MerkleTrees trees)
{
this.endpoint = endpoint;
- this.tree = tree;
+ this.trees = trees;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/ValidationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java b/src/java/org/apache/cassandra/repair/ValidationTask.java
index a52ec4f..bd866d2 100644
--- a/src/java/org/apache/cassandra/repair/ValidationTask.java
+++ b/src/java/org/apache/cassandra/repair/ValidationTask.java
@@ -18,13 +18,17 @@
package org.apache.cassandra.repair;
import java.net.InetAddress;
+import java.util.Map;
import com.google.common.util.concurrent.AbstractFuture;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RepairException;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.ValidationRequest;
import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
/**
* ValidationTask sends {@link ValidationRequest} to a replica.
@@ -53,19 +57,19 @@ public class ValidationTask extends AbstractFuture<TreeResponse> implements Runn
}
/**
- * Receive MerkleTree from replica node.
+ * Receive MerkleTrees from replica node.
*
- * @param tree MerkleTree that is sent from replica. Null if validation failed on replica node.
+ * @param trees MerkleTrees that is sent from replica. Null if validation failed on replica node.
*/
- public void treeReceived(MerkleTree tree)
+ public void treesReceived(MerkleTrees trees)
{
- if (tree == null)
+ if (trees == null)
{
setException(new RepairException(desc, "Validation failed in " + endpoint));
}
else
{
- set(new TreeResponse(endpoint, tree));
+ set(new TreeResponse(endpoint, trees));
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 87d186c..7d6c787 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -33,12 +33,15 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.ValidationComplete;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.MerkleTree.RowHash;
+import org.apache.cassandra.utils.MerkleTrees;
/**
* Handles the building of a merkle tree for a column family.
@@ -58,11 +61,11 @@ public class Validator implements Runnable
// null when all rows with the min token have been consumed
private long validated;
- private MerkleTree tree;
+ private MerkleTrees trees;
// current range being updated
private MerkleTree.TreeRange range;
// iterator for iterating sub ranges (MT's leaves)
- private MerkleTree.TreeRangeIterator ranges;
+ private MerkleTrees.TreeRangeIterator ranges;
// last key seen
private DecoratedKey lastKey;
@@ -76,9 +79,9 @@ public class Validator implements Runnable
ranges = null;
}
- public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
+ public void prepare(ColumnFamilyStore cfs, MerkleTrees tree)
{
- this.tree = tree;
+ this.trees = tree;
if (!tree.partitioner().preservesOrder())
{
@@ -87,32 +90,35 @@ public class Validator implements Runnable
}
else
{
- List<DecoratedKey> keys = new ArrayList<>();
- for (DecoratedKey sample : cfs.keySamples(desc.range))
+ for (Range<Token> range : tree.ranges())
{
- assert desc.range.contains(sample.getToken()): "Token " + sample.getToken() + " is not within range " + desc.range;
- keys.add(sample);
- }
+ List<DecoratedKey> keys = new ArrayList<>();
+ for (DecoratedKey sample : cfs.keySamples(range))
+ {
+ assert range.contains(sample.getToken()) : "Token " + sample.getToken() + " is not within range " + desc.ranges;
+ keys.add(sample);
+ }
- if (keys.isEmpty())
- {
- // use an even tree distribution
- tree.init();
- }
- else
- {
- int numkeys = keys.size();
- Random random = new Random();
- // sample the column family using random keys from the index
- while (true)
+ if (keys.isEmpty())
+ {
+ // use an even tree distribution
+ tree.init(range);
+ }
+ else
{
- DecoratedKey dk = keys.get(random.nextInt(numkeys));
- if (!tree.split(dk.getToken()))
- break;
+ int numKeys = keys.size();
+ Random random = new Random();
+ // sample the column family using random keys from the index
+ while (true)
+ {
+ DecoratedKey dk = keys.get(random.nextInt(numKeys));
+ if (!tree.split(dk.getToken()))
+ break;
+ }
}
}
}
- logger.debug("Prepared AEService tree of size {} for {}", tree.size(), desc);
+ logger.debug("Prepared AEService trees of size {} for {}", trees.size(), desc);
ranges = tree.invalids();
}
@@ -124,7 +130,7 @@ public class Validator implements Runnable
*/
public void add(UnfilteredRowIterator partition)
{
- assert desc.range.contains(partition.partitionKey().getToken()) : partition.partitionKey().getToken() + " is not contained in " + desc.range;
+ assert Range.isInRanges(partition.partitionKey().getToken(), desc.ranges) : partition.partitionKey().getToken() + " is not contained in " + desc.ranges;
assert lastKey == null || lastKey.compareTo(partition.partitionKey()) < 0
: "partition " + partition.partitionKey() + " received out of order wrt " + lastKey;
lastKey = partition.partitionKey();
@@ -133,13 +139,14 @@ public class Validator implements Runnable
range = ranges.next();
// generate new ranges as long as case 1 is true
- while (!range.contains(lastKey.getToken()))
+ if (!findCorrectRange(lastKey.getToken()))
{
// add the empty hash, and move to the next range
- range.ensureHashInitialised();
- range = ranges.next();
+ ranges = trees.invalids();
+ findCorrectRange(lastKey.getToken());
}
+ assert range.contains(lastKey.getToken()) : "Token not in MerkleTree: " + lastKey.getToken();
// case 3 must be true: mix in the hashed row
RowHash rowHash = rowHash(partition);
if (rowHash != null)
@@ -148,6 +155,16 @@ public class Validator implements Runnable
}
}
+ public boolean findCorrectRange(Token t)
+ {
+ while (!range.contains(t) && ranges.hasNext())
+ {
+ range = ranges.next();
+ }
+
+ return range.contains(t);
+ }
+
static class CountingDigest extends MessageDigest
{
private long count;
@@ -212,9 +229,9 @@ public class Validator implements Runnable
{
// log distribution of rows in tree
logger.debug("Validated {} partitions for {}. Partitions per leaf are:", validated, desc.sessionId);
- tree.histogramOfRowCountPerLeaf().log(logger);
+ trees.logRowCountPerLeaf(logger);
logger.debug("Validated {} partitions for {}. Partition sizes are:", validated, desc.sessionId);
- tree.histogramOfRowSizePerLeaf().log(logger);
+ trees.logRowSizePerLeaf(logger);
}
}
@@ -223,8 +240,8 @@ public class Validator implements Runnable
{
assert ranges != null : "Validator was not prepared()";
- if (range != null)
- range.ensureHashInitialised();
+ ranges = trees.invalids();
+
while (ranges.hasNext())
{
range = ranges.next();
@@ -255,6 +272,6 @@ public class Validator implements Runnable
logger.info(String.format("[repair #%s] Sending completed merkle tree to %s for %s.%s", desc.sessionId, initiator, desc.keyspace, desc.columnFamily));
Tracing.traceRepair("Sending completed merkle tree to {} for {}.{}", initiator, desc.keyspace, desc.columnFamily);
}
- MessagingService.instance().sendOneWay(new ValidationComplete(desc, tree).createMessage(), initiator);
+ MessagingService.instance().sendOneWay(new ValidationComplete(desc, trees).createMessage(), initiator);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
index ef0c4ec..90be8e5 100644
--- a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
@@ -23,7 +23,7 @@ import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.repair.RepairJobDesc;
-import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
/**
* ValidationComplete message is sent when validation compaction completed successfully.
@@ -34,24 +34,25 @@ public class ValidationComplete extends RepairMessage
{
public static MessageSerializer serializer = new ValidationCompleteSerializer();
- /** true if validation success, false otherwise */
- public final boolean success;
/** Merkle hash tree response. Null if validation failed. */
- public final MerkleTree tree;
+ public final MerkleTrees trees;
public ValidationComplete(RepairJobDesc desc)
{
super(Type.VALIDATION_COMPLETE, desc);
- this.success = false;
- this.tree = null;
+ trees = null;
}
- public ValidationComplete(RepairJobDesc desc, MerkleTree tree)
+ public ValidationComplete(RepairJobDesc desc, MerkleTrees trees)
{
super(Type.VALIDATION_COMPLETE, desc);
- assert tree != null;
- this.success = true;
- this.tree = tree;
+ assert trees != null;
+ this.trees = trees;
+ }
+
+ public boolean success()
+ {
+ return trees != null;
}
private static class ValidationCompleteSerializer implements MessageSerializer<ValidationComplete>
@@ -59,31 +60,31 @@ public class ValidationComplete extends RepairMessage
public void serialize(ValidationComplete message, DataOutputPlus out, int version) throws IOException
{
RepairJobDesc.serializer.serialize(message.desc, out, version);
- out.writeBoolean(message.success);
- if (message.success)
- MerkleTree.serializer.serialize(message.tree, out, version);
+ out.writeBoolean(message.success());
+ if (message.trees != null)
+ MerkleTrees.serializer.serialize(message.trees, out, version);
}
public ValidationComplete deserialize(DataInputPlus in, int version) throws IOException
{
RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
- if (in.readBoolean())
- {
- MerkleTree tree = MerkleTree.serializer.deserialize(in, version);
- return new ValidationComplete(desc, tree);
- }
- else
+ boolean success = in.readBoolean();
+
+ if (success)
{
- return new ValidationComplete(desc);
+ MerkleTrees trees = MerkleTrees.serializer.deserialize(in, version);
+ return new ValidationComplete(desc, trees);
}
+
+ return new ValidationComplete(desc);
}
public long serializedSize(ValidationComplete message, int version)
{
long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
- size += TypeSizes.sizeof(message.success);
- if (message.success)
- size += MerkleTree.serializer.serializedSize(message.tree, version);
+ size += TypeSizes.sizeof(message.success());
+ if (message.trees != null)
+ size += MerkleTrees.serializer.serializedSize(message.trees, version);
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 213edeb..e75d13e 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -103,7 +103,7 @@ public class ActiveRepairService
* @return Future for asynchronous call or null if there is no need to repair
*/
public RepairSession submitRepairSession(UUID parentRepairSession,
- Range<Token> range,
+ Collection<Range<Token>> range,
String keyspace,
RepairParallelism parallelismDegree,
Set<InetAddress> endpoints,
@@ -383,7 +383,7 @@ public class ActiveRepairService
{
case VALIDATION_COMPLETE:
ValidationComplete validation = (ValidationComplete) message;
- session.validationComplete(desc, endpoint, validation.tree);
+ session.validationComplete(desc, endpoint, validation.trees);
break;
case SYNC_COMPLETE:
// one of replica is synced.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/utils/MerkleTrees.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTrees.java b/src/java/org/apache/cassandra/utils/MerkleTrees.java
new file mode 100644
index 0000000..43c023e
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/MerkleTrees.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.PeekingIterator;
+import org.slf4j.Logger;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+
+/**
+ * Wrapper class for handling of multiple MerkleTrees at once.
+ *
+ * The MerkleTree's are divided in Ranges of non-overlapping tokens.
+ */
+public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>>
+{
+ public static final MerkleTreesSerializer serializer = new MerkleTreesSerializer();
+
+ private Map<Range<Token>, MerkleTree> merkleTrees = new TreeMap<>(new TokenRangeComparator());
+
+ private IPartitioner partitioner;
+
+ /**
+ * Creates empty MerkleTrees object.
+ *
+ * @param partitioner The partitioner to use
+ */
+ public MerkleTrees(IPartitioner partitioner)
+ {
+ this(partitioner, new ArrayList<>());
+ }
+
+ private MerkleTrees(IPartitioner partitioner, Collection<MerkleTree> merkleTrees)
+ {
+ this.partitioner = partitioner;
+ addTrees(merkleTrees);
+ }
+
+ /**
+ * Get the ranges that these merkle trees covers.
+ *
+ * @return
+ */
+ public Collection<Range<Token>> ranges()
+ {
+ return merkleTrees.keySet();
+ }
+
+ /**
+ * Get the partitioner in use.
+ *
+ * @return
+ */
+ public IPartitioner partitioner()
+ {
+ return partitioner;
+ }
+
+ /**
+ * Add merkle tree's with the defined maxsize and ranges.
+ *
+ * @param maxsize
+ * @param ranges
+ */
+ public void addMerkleTrees(int maxsize, Collection<Range<Token>> ranges)
+ {
+ for (Range<Token> range : ranges)
+ {
+ addMerkleTree(maxsize, range);
+ }
+ }
+
+ /**
+ * Add a MerkleTree with the defined size and range.
+ *
+ * @param maxsize
+ * @param range
+ * @return The created merkle tree.
+ */
+ public MerkleTree addMerkleTree(int maxsize, Range<Token> range)
+ {
+ return addMerkleTree(maxsize, MerkleTree.RECOMMENDED_DEPTH, range);
+ }
+
+ @VisibleForTesting
+ public MerkleTree addMerkleTree(int maxsize, byte hashdepth, Range<Token> range)
+ {
+ MerkleTree tree = new MerkleTree(partitioner, range, hashdepth, maxsize);
+ addTree(tree);
+
+ return tree;
+ }
+
+ /**
+ * Get the MerkleTree.Range responsible for the given token.
+ *
+ * @param t
+ * @return
+ */
+ public MerkleTree.TreeRange get(Token t)
+ {
+ return getMerkleTree(t).get(t);
+ }
+
+ /**
+ * Init all MerkleTree's with an even tree distribution.
+ */
+ public void init()
+ {
+ for (Range<Token> range : merkleTrees.keySet())
+ {
+ init(range);
+ }
+ }
+
+ /**
+ * Init a selected MerkleTree with an even tree distribution.
+ *
+ * @param range
+ */
+ public void init(Range<Token> range)
+ {
+ merkleTrees.get(range).init();
+ }
+
+ /**
+ * Split the MerkleTree responsible for the given token.
+ *
+ * @param t
+ * @return
+ */
+ public boolean split(Token t)
+ {
+ return getMerkleTree(t).split(t);
+ }
+
+ /**
+ * Invalidate the MerkleTree responsible for the given token.
+ *
+ * @param t
+ */
+ public void invalidate(Token t)
+ {
+ getMerkleTree(t).invalidate(t);
+ }
+
+ /**
+ * Get the MerkleTree responsible for the given token range.
+ *
+ * @param range
+ * @return
+ */
+ public MerkleTree getMerkleTree(Range<Token> range)
+ {
+ return merkleTrees.get(range);
+ }
+
+ public long size()
+ {
+ long size = 0;
+
+ for (MerkleTree tree : merkleTrees.values())
+ {
+ size += tree.size();
+ }
+
+ return size;
+ }
+
+ @VisibleForTesting
+ public void maxsize(Range<Token> range, int maxsize)
+ {
+ getMerkleTree(range).maxsize(maxsize);
+ }
+
+ /**
+ * Get the MerkleTree responsible for the given token.
+ *
+ * @param t
+ * @return The given MerkleTree or null if none exist.
+ */
+ private MerkleTree getMerkleTree(Token t)
+ {
+ for (Range<Token> range : merkleTrees.keySet())
+ {
+ if (range.contains(t))
+ return merkleTrees.get(range);
+ }
+
+ return null;
+ }
+
+ private void addTrees(Collection<MerkleTree> trees)
+ {
+ for (MerkleTree tree : trees)
+ {
+ addTree(tree);
+ }
+ }
+
+ private void addTree(MerkleTree tree)
+ {
+ assert validateNonOverlapping(tree) : "Range [" + tree.fullRange + "] is intersecting an existing range";
+
+ merkleTrees.put(tree.fullRange, tree);
+ }
+
+ private boolean validateNonOverlapping(MerkleTree tree)
+ {
+ for (Range<Token> range : merkleTrees.keySet())
+ {
+ if (tree.fullRange.intersects(range))
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Get an iterator for all the invalids generated by the MerkleTrees.
+ *
+ * @return
+ */
+ public TreeRangeIterator invalids()
+ {
+ return new TreeRangeIterator();
+ }
+
+ /**
+ * Log the row count per leaf for all MerkleTrees.
+ *
+ * @param logger
+ */
+ public void logRowCountPerLeaf(Logger logger)
+ {
+ for (MerkleTree tree : merkleTrees.values())
+ {
+ tree.histogramOfRowCountPerLeaf().log(logger);
+ }
+ }
+
+ /**
+ * Log the row size per leaf for all MerkleTrees.
+ *
+ * @param logger
+ */
+ public void logRowSizePerLeaf(Logger logger)
+ {
+ for (MerkleTree tree : merkleTrees.values())
+ {
+ tree.histogramOfRowSizePerLeaf().log(logger);
+ }
+ }
+
+ @VisibleForTesting
+ public byte[] hash(Range<Token> range)
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ boolean hashed = false;
+
+ try
+ {
+ for (Range<Token> rt : merkleTrees.keySet())
+ {
+ if (rt.intersects(range))
+ {
+ byte[] bytes = merkleTrees.get(rt).hash(range);
+ if (bytes != null)
+ {
+ baos.write(bytes);
+ hashed = true;
+ }
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Unable to append merkle tree hash to result");
+ }
+
+ return hashed ? baos.toByteArray() : null;
+ }
+
+ /**
+ * Get an iterator of all ranges and their MerkleTrees.
+ */
+ public Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator()
+ {
+ return merkleTrees.entrySet().iterator();
+ }
+
+ public class TreeRangeIterator extends AbstractIterator<MerkleTree.TreeRange> implements
+ Iterable<MerkleTree.TreeRange>,
+ PeekingIterator<MerkleTree.TreeRange>
+ {
+ private final Iterator<MerkleTree> it;
+
+ private MerkleTree.TreeRangeIterator current = null;
+
+ private TreeRangeIterator()
+ {
+ it = merkleTrees.values().iterator();
+ }
+
+ public MerkleTree.TreeRange computeNext()
+ {
+ if (current == null || !current.hasNext())
+ return nextIterator();
+
+ return current.next();
+ }
+
+ private MerkleTree.TreeRange nextIterator()
+ {
+ if (it.hasNext())
+ {
+ current = it.next().invalids();
+
+ return current.next();
+ }
+
+ return endOfData();
+ }
+
+ public Iterator<MerkleTree.TreeRange> iterator()
+ {
+ return this;
+ }
+ }
+
+ /**
+ * Get the differences between the two sets of MerkleTrees.
+ *
+ * @param ltree
+ * @param rtree
+ * @return
+ */
+ public static List<Range<Token>> difference(MerkleTrees ltree, MerkleTrees rtree)
+ {
+ List<Range<Token>> differences = new ArrayList<>();
+ for (MerkleTree tree : ltree.merkleTrees.values())
+ {
+ differences.addAll(MerkleTree.difference(tree, rtree.getMerkleTree(tree.fullRange)));
+ }
+ return differences;
+ }
+
+ public static class MerkleTreesSerializer implements IVersionedSerializer<MerkleTrees>
+ {
+ public void serialize(MerkleTrees trees, DataOutputPlus out, int version) throws IOException
+ {
+ out.writeInt(trees.merkleTrees.size());
+ for (MerkleTree tree : trees.merkleTrees.values())
+ {
+ MerkleTree.serializer.serialize(tree, out, version);
+ }
+ }
+
+ public MerkleTrees deserialize(DataInputPlus in, int version) throws IOException
+ {
+ IPartitioner partitioner = null;
+ int nTrees = in.readInt();
+ Collection<MerkleTree> trees = new ArrayList<>(nTrees);
+ if (nTrees > 0)
+ {
+ for (int i = 0; i < nTrees; i++)
+ {
+ MerkleTree tree = MerkleTree.serializer.deserialize(in, version);
+ trees.add(tree);
+
+ if (partitioner == null)
+ partitioner = tree.partitioner();
+ else
+ assert tree.partitioner() == partitioner;
+ }
+ }
+
+ return new MerkleTrees(partitioner, trees);
+ }
+
+ public long serializedSize(MerkleTrees trees, int version)
+ {
+ assert trees != null;
+
+ long size = TypeSizes.sizeof(trees.merkleTrees.size());
+ for (MerkleTree tree : trees.merkleTrees.values())
+ {
+ size += MerkleTree.serializer.serializedSize(tree, version);
+ }
+ return size;
+ }
+
+ }
+
+ private static class TokenRangeComparator implements Comparator<Range<Token>>
+ {
+ @Override
+ public int compare(Range<Token> rt1, Range<Token> rt2)
+ {
+ if (rt1.left.compareTo(rt2.left) == 0)
+ return 0;
+
+ return rt1.compareTo(rt2);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/gms.EndpointState.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/3.0/gms.EndpointState.bin b/test/data/serialization/3.0/gms.EndpointState.bin
new file mode 100644
index 0000000..a230ae1
Binary files /dev/null and b/test/data/serialization/3.0/gms.EndpointState.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/gms.Gossip.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/3.0/gms.Gossip.bin b/test/data/serialization/3.0/gms.Gossip.bin
new file mode 100644
index 0000000..af5ac57
Binary files /dev/null and b/test/data/serialization/3.0/gms.Gossip.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/service.SyncComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/3.0/service.SyncComplete.bin b/test/data/serialization/3.0/service.SyncComplete.bin
new file mode 100644
index 0000000..73ea4b4
Binary files /dev/null and b/test/data/serialization/3.0/service.SyncComplete.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/service.SyncRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/3.0/service.SyncRequest.bin b/test/data/serialization/3.0/service.SyncRequest.bin
new file mode 100644
index 0000000..7e09777
Binary files /dev/null and b/test/data/serialization/3.0/service.SyncRequest.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/service.ValidationComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/3.0/service.ValidationComplete.bin b/test/data/serialization/3.0/service.ValidationComplete.bin
new file mode 100644
index 0000000..b8f0fb9
Binary files /dev/null and b/test/data/serialization/3.0/service.ValidationComplete.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/service.ValidationRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/3.0/service.ValidationRequest.bin b/test/data/serialization/3.0/service.ValidationRequest.bin
new file mode 100644
index 0000000..a00763b
Binary files /dev/null and b/test/data/serialization/3.0/service.ValidationRequest.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index 4ee5ce4..501f4ae 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -37,7 +37,7 @@ import java.util.Map;
public class AbstractSerializationsTester
{
- protected static final String CUR_VER = System.getProperty("cassandra.version", "2.1");
+ protected static final String CUR_VER = System.getProperty("cassandra.version", "3.0");
protected static final Map<String, Integer> VERSION_MAP = new HashMap<String, Integer> ()
{{
put("0.7", 1);
@@ -46,6 +46,7 @@ public class AbstractSerializationsTester
put("2.0", MessagingService.VERSION_20);
put("2.1", MessagingService.VERSION_21);
put("2.2", MessagingService.VERSION_22);
+ put("3.0", MessagingService.VERSION_30);
}};
protected static final boolean EXECUTE_WRITES = Boolean.getBoolean("cassandra.test-serialization-writes");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index c3be08a..46c7068 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -173,7 +173,7 @@ public class LeveledCompactionStrategyTest
int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds());
UUID parentRepSession = UUID.randomUUID();
ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false);
- RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, range);
+ RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range));
Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);
CompactionManager.instance.submitValidation(cfs, validator).get();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 77a6ac4..db3f683 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -20,13 +20,14 @@ package org.apache.cassandra.repair;
import java.net.InetAddress;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.junit.BeforeClass;
import org.junit.Test;
-
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
@@ -37,6 +38,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
import static org.junit.Assert.assertEquals;
@@ -65,10 +67,11 @@ public class LocalSyncTaskTest extends SchemaLoader
final InetAddress ep2 = InetAddress.getByName("127.0.0.1");
Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken());
- RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", range);
+ RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
+
+ MerkleTrees tree1 = createInitialTree(desc);
- MerkleTree tree1 = createInitialTree(desc);
- MerkleTree tree2 = createInitialTree(desc);
+ MerkleTrees tree2 = createInitialTree(desc);
// difference the trees
// note: we reuse the same endpoint which is bogus in theory but fine here
@@ -90,10 +93,11 @@ public class LocalSyncTaskTest extends SchemaLoader
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false);
- RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", range);
+ RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
+
+ MerkleTrees tree1 = createInitialTree(desc);
- MerkleTree tree1 = createInitialTree(desc);
- MerkleTree tree2 = createInitialTree(desc);
+ MerkleTrees tree2 = createInitialTree(desc);
// change a range in one of the trees
Token token = partirioner.midpoint(range.left, range.right);
@@ -115,9 +119,10 @@ public class LocalSyncTaskTest extends SchemaLoader
assertEquals("Wrong differing ranges", interesting.size(), task.getCurrentStat().numberOfDifferences);
}
- private MerkleTree createInitialTree(RepairJobDesc desc)
+ private MerkleTrees createInitialTree(RepairJobDesc desc)
{
- MerkleTree tree = new MerkleTree(partirioner, desc.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15));
+ MerkleTrees tree = new MerkleTrees(partirioner);
+ tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges);
tree.init();
for (MerkleTree.TreeRange r : tree.invalids())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index 0af94b2..d40982c 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair;
import java.io.IOException;
import java.net.InetAddress;
+import java.util.Arrays;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@@ -53,7 +54,7 @@ public class RepairSessionTest
IPartitioner p = Murmur3Partitioner.instance;
Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100)));
Set<InetAddress> endpoints = Sets.newHashSet(remote);
- RepairSession session = new RepairSession(parentSessionId, sessionId, repairRange, "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, "Standard1");
+ RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, "Standard1");
// perform convict
session.convict(remote, Double.MAX_VALUE);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index d77daf0..8fe76c3 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -19,6 +19,9 @@
package org.apache.cassandra.repair;
import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import org.junit.After;
@@ -43,6 +46,7 @@ import org.apache.cassandra.repair.messages.ValidationComplete;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import static org.junit.Assert.assertEquals;
@@ -77,7 +81,7 @@ public class ValidatorTest
public void testValidatorComplete() throws Throwable
{
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
- final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
+ final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
final SimpleCondition lock = new SimpleCondition();
MessagingService.instance().addMessageSink(new IMessageSink()
@@ -91,8 +95,8 @@ public class ValidatorTest
RepairMessage m = (RepairMessage) message.payload;
assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success);
- assertNotNull(((ValidationComplete) m).tree);
+ assertTrue(((ValidationComplete) m).success());
+ assertNotNull(((ValidationComplete) m).trees);
}
}
finally
@@ -113,7 +117,8 @@ public class ValidatorTest
ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
Validator validator = new Validator(desc, remote, 0);
- MerkleTree tree = new MerkleTree(partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, 15));
+ MerkleTrees tree = new MerkleTrees(partitioner);
+ tree.addMerkleTrees((int) Math.pow(2, 15), validator.desc.ranges);
validator.prepare(cfs, tree);
// and confirm that the tree was split
@@ -137,7 +142,7 @@ public class ValidatorTest
public void testValidatorFailed() throws Throwable
{
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
- final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
+ final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
final SimpleCondition lock = new SimpleCondition();
MessagingService.instance().addMessageSink(new IMessageSink()
@@ -151,8 +156,8 @@ public class ValidatorTest
RepairMessage m = (RepairMessage) message.payload;
assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
assertEquals(desc, m.desc);
- assertFalse(((ValidationComplete) m).success);
- assertNull(((ValidationComplete) m).tree);
+ assertFalse(((ValidationComplete) m).success());
+ assertNull(((ValidationComplete) m).trees);
}
}
finally
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 80bb452..847bcea 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service;
import java.io.IOException;
import java.net.InetAddress;
+import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
@@ -43,7 +44,7 @@ import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.Validator;
import org.apache.cassandra.repair.messages.*;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
public class SerializationsTest extends AbstractSerializationsTester
{
@@ -58,7 +59,7 @@ public class SerializationsTest extends AbstractSerializationsTester
partitionerSwitcher = Util.switchPartitioner(RandomPartitioner.instance);
RANDOM_UUID = UUID.fromString("b5c3d033-75aa-4c2f-a819-947aac7a0c54");
FULL_RANGE = new Range<>(Util.testPartitioner().getMinimumToken(), Util.testPartitioner().getMinimumToken());
- DESC = new RepairJobDesc(getVersion() < MessagingService.VERSION_21 ? null : RANDOM_UUID, RANDOM_UUID, "Keyspace1", "Standard1", FULL_RANGE);
+ DESC = new RepairJobDesc(getVersion() < MessagingService.VERSION_21 ? null : RANDOM_UUID, RANDOM_UUID, "Keyspace1", "Standard1", Arrays.asList(FULL_RANGE));
}
@AfterClass
@@ -66,8 +67,7 @@ public class SerializationsTest extends AbstractSerializationsTester
{
partitionerSwitcher.close();
}
-
-
+
private void testRepairMessageWrite(String fileName, RepairMessage... messages) throws IOException
{
try (DataOutputStreamPlus out = getOutput(fileName))
@@ -109,13 +109,17 @@ public class SerializationsTest extends AbstractSerializationsTester
private void testValidationCompleteWrite() throws IOException
{
IPartitioner p = RandomPartitioner.instance;
+
+ MerkleTrees mt = new MerkleTrees(p);
+
// empty validation
- MerkleTree mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, 15));
+ mt.addMerkleTree((int) Math.pow(2, 15), FULL_RANGE);
Validator v0 = new Validator(DESC, FBUtilities.getBroadcastAddress(), -1);
ValidationComplete c0 = new ValidationComplete(DESC, mt);
// validation with a tree
- mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, Integer.MAX_VALUE);
+ mt = new MerkleTrees(p);
+ mt.addMerkleTree(Integer.MAX_VALUE, FULL_RANGE);
for (int i = 0; i < 10; i++)
mt.split(p.getRandomToken());
Validator v1 = new Validator(DESC, FBUtilities.getBroadcastAddress(), -1);
@@ -140,24 +144,24 @@ public class SerializationsTest extends AbstractSerializationsTester
assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE;
assert DESC.equals(message.desc);
- assert ((ValidationComplete) message).success;
- assert ((ValidationComplete) message).tree != null;
+ assert ((ValidationComplete) message).success();
+ assert ((ValidationComplete) message).trees != null;
// validation with a tree
message = RepairMessage.serializer.deserialize(in, getVersion());
assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE;
assert DESC.equals(message.desc);
- assert ((ValidationComplete) message).success;
- assert ((ValidationComplete) message).tree != null;
+ assert ((ValidationComplete) message).success();
+ assert ((ValidationComplete) message).trees != null;
// failed validation
message = RepairMessage.serializer.deserialize(in, getVersion());
assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE;
assert DESC.equals(message.desc);
- assert !((ValidationComplete) message).success;
- assert ((ValidationComplete) message).tree == null;
+ assert !((ValidationComplete) message).success();
+ assert ((ValidationComplete) message).trees == null;
// MessageOuts
for (int i = 0; i < 3; i++)