You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/01/30 16:24:02 UTC
git commit: Add command to rebuild node without merkle tree
calculations
Updated Branches:
refs/heads/trunk 8a5436a2b -> 2deee7a4e
Add command to rebuild node without merkle tree calculations
patch by scode; reviewed by jbellis and slebresne for CASSANDRA-3483
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2deee7a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2deee7a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2deee7a4
Branch: refs/heads/trunk
Commit: 2deee7a4e70c3e17344f7bbd53e9a72a8ac99dba
Parents: 8a5436a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Jan 30 16:22:39 2012 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Jan 30 16:22:39 2012 +0100
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/cassandra/dht/BootStrapper.java | 105 +------
.../org/apache/cassandra/dht/RangeStreamer.java | 245 +++++++++++++++
.../apache/cassandra/service/StorageService.java | 19 +-
.../cassandra/service/StorageServiceMBean.java | 9 +
.../apache/cassandra/streaming/OperationType.java | 3 +-
.../cassandra/streaming/StreamInSession.java | 2 +-
src/java/org/apache/cassandra/tools/NodeCmd.java | 6 +
src/java/org/apache/cassandra/tools/NodeProbe.java | 5 +
.../org/apache/cassandra/dht/BootStrapperTest.java | 43 ++--
10 files changed, 318 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0a303f5..eec1583 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -60,6 +60,8 @@
* Add wide row support for ColumnFamilyInputFormat (CASSANDRA-3264)
* Allow extending CompositeType comparator (CASSANDRA-3657)
* Avoids over-paging during get_count (CASSANDRA-3798)
+ * Add new command to rebuild a node without (repair) merkle tree calculations
+ (CASSANDRA-3483)
1.0.8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 5b84867..6108214 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -58,9 +58,9 @@ public class BootStrapper
{
private static final Logger logger = LoggerFactory.getLogger(BootStrapper.class);
- /* endpoints that need to be bootstrapped */
+ /* endpoint that needs to be bootstrapped */
protected final InetAddress address;
- /* tokens of the nodes being bootstrapped. */
+ /* token of the node being bootstrapped. */
protected final Token<?> token;
protected final TokenMetadata tokenMetadata;
private static final long BOOTSTRAP_TIMEOUT = 30000; // default bootstrap timeout of 30s
@@ -80,52 +80,17 @@ public class BootStrapper
if (logger.isDebugEnabled())
logger.debug("Beginning bootstrap process");
- final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> rangesToFetch = HashMultimap.create();
+ RangeStreamer streamer = new RangeStreamer(tokenMetadata, address, OperationType.BOOTSTRAP);
+ streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
- int requests = 0;
for (String table : Schema.instance.getNonSystemTables())
{
- Map<InetAddress, Collection<Range<Token>>> workMap = getWorkMap(getRangesWithSources(table)).asMap();
- for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : workMap.entrySet())
- {
- requests++;
- rangesToFetch.put(table, entry);
- }
- }
-
- final CountDownLatch latch = new CountDownLatch(requests);
- for (final String table : rangesToFetch.keySet())
- {
- /* Send messages to respective folks to stream data over to me */
- for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : rangesToFetch.get(table))
- {
- final InetAddress source = entry.getKey();
- Collection<Range<Token>> ranges = entry.getValue();
- final Runnable callback = new Runnable()
- {
- public void run()
- {
- latch.countDown();
- if (logger.isDebugEnabled())
- logger.debug(String.format("Removed %s/%s as a bootstrap source; remaining is %s",
- source, table, latch.getCount()));
- }
- };
- if (logger.isDebugEnabled())
- logger.debug("Bootstrapping from " + source + " ranges " + StringUtils.join(ranges, ", "));
- StreamIn.requestRanges(source, table, ranges, callback, OperationType.BOOTSTRAP);
- }
+ AbstractReplicationStrategy strategy = Table.open(table).getReplicationStrategy();
+ streamer.addRanges(table, strategy.getPendingAddressRanges(tokenMetadata, token, address));
}
- try
- {
- latch.await();
- StorageService.instance.finishBootstrapping();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
+ streamer.fetch();
+ StorageService.instance.finishBootstrapping();
}
/**
@@ -197,31 +162,6 @@ public class BootStrapper
return maxEndpoint;
}
- /** get potential sources for each range, ordered by proximity (as determined by EndpointSnitch) */
- Multimap<Range<Token>, InetAddress> getRangesWithSources(String table)
- {
- assert tokenMetadata.sortedTokens().size() > 0;
- final AbstractReplicationStrategy strat = Table.open(table).getReplicationStrategy();
- Collection<Range<Token>> myRanges = strat.getPendingAddressRanges(tokenMetadata, token, address);
-
- Multimap<Range<Token>, InetAddress> myRangeAddresses = ArrayListMultimap.create();
- Multimap<Range<Token>, InetAddress> rangeAddresses = strat.getRangeAddresses(tokenMetadata);
- for (Range<Token> myRange : myRanges)
- {
- for (Range<Token> range : rangeAddresses.keySet())
- {
- if (range.contains(myRange))
- {
- List<InetAddress> preferred = DatabaseDescriptor.getEndpointSnitch().getSortedListByProximity(address, rangeAddresses.get(range));
- myRangeAddresses.putAll(myRange, preferred);
- break;
- }
- }
- assert myRangeAddresses.keySet().contains(myRange);
- }
- return myRangeAddresses;
- }
-
static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
{
Message message = new Message(FBUtilities.getBroadcastAddress(),
@@ -244,35 +184,6 @@ public class BootStrapper
throw new RuntimeException("Bootstrap failed, could not obtain token from: " + maxEndpoint);
}
- public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget)
- {
- return getWorkMap(rangesWithSourceTarget, FailureDetector.instance);
- }
-
- static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget, IFailureDetector failureDetector)
- {
- /*
- * Map whose key is the source node and the value is a map whose key is the
- * target and value is the list of ranges to be sent to it.
- */
- Multimap<InetAddress, Range<Token>> sources = ArrayListMultimap.create();
-
- // TODO look for contiguous ranges and map them to the same source
- for (Range<Token> range : rangesWithSourceTarget.keySet())
- {
- for (InetAddress source : rangesWithSourceTarget.get(range))
- {
- // ignore the local IP...
- if (failureDetector.isAlive(source) && !source.equals(FBUtilities.getBroadcastAddress()))
- {
- sources.put(source, range);
- break;
- }
- }
- }
- return sources;
- }
-
public static class BootstrapTokenVerbHandler implements IVerbHandler
{
public void doVerb(Message message, String id)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
new file mode 100644
index 0000000..9b2b41a
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht;
+
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.streaming.OperationType;
+import org.apache.cassandra.streaming.StreamIn;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Assists in streaming ranges to a node.
+ */
+public class RangeStreamer
+{
+ private static final Logger logger = LoggerFactory.getLogger(RangeStreamer.class);
+
+ private final TokenMetadata metadata;
+ private final InetAddress address;
+ private final OperationType opType;
+ private final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = HashMultimap.create();
+ private final Set<ISourceFilter> sourceFilters = new HashSet<ISourceFilter>();
+
+ /**
+ * A filter applied to sources to stream from when constructing a fetch map.
+ */
+ public static interface ISourceFilter
+ {
+ public boolean shouldInclude(InetAddress endpoint);
+ }
+
+ /**
+ * Source filter which excludes any endpoints that are not alive according to a
+ * failure detector.
+ */
+ public static class FailureDetectorSourceFilter implements ISourceFilter
+ {
+ private final IFailureDetector fd;
+
+ public FailureDetectorSourceFilter(IFailureDetector fd)
+ {
+ this.fd = fd;
+ }
+
+ public boolean shouldInclude(InetAddress endpoint)
+ {
+ return fd.isAlive(endpoint);
+ }
+ }
+
+ /**
+ * Source filter which excludes any endpoints that are not in a specific data center.
+ */
+ public static class SingleDatacenterFilter implements ISourceFilter
+ {
+ private final String sourceDc;
+ private final IEndpointSnitch snitch;
+
+ public SingleDatacenterFilter(IEndpointSnitch snitch, String sourceDc)
+ {
+ this.sourceDc = sourceDc;
+ this.snitch = snitch;
+ }
+
+ public boolean shouldInclude(InetAddress endpoint)
+ {
+ return snitch.getDatacenter(endpoint).equals(sourceDc);
+ }
+ }
+
+ public RangeStreamer(TokenMetadata metadata, InetAddress address, OperationType opType)
+ {
+ this.metadata = metadata;
+ this.address = address;
+ this.opType = opType;
+ }
+
+ public void addSourceFilter(ISourceFilter filter)
+ {
+ sourceFilters.add(filter);
+ }
+
+ public void addRanges(String table, Collection<Range<Token>> ranges)
+ {
+ Multimap<Range<Token>, InetAddress> rangesForTable = getAllRangesWithSourcesFor(table, ranges);
+
+ if (logger.isDebugEnabled())
+ {
+ for (Map.Entry<Range<Token>, InetAddress> entry: rangesForTable.entries())
+ logger.debug(String.format("%s: range %s exists on %s", opType, entry.getKey(), entry.getValue()));
+ }
+
+ for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : getRangeFetchMap(rangesForTable, sourceFilters).asMap().entrySet())
+ {
+ if (logger.isDebugEnabled())
+ {
+ for (Range r : entry.getValue())
+ logger.debug(String.format("%s: range %s from source %s for table %s", opType, r, entry.getKey(), table));
+ }
+ toFetch.put(table, entry);
+ }
+ }
+
+ /**
+ * Get a map of all ranges and their respective sources that are candidates for streaming the given ranges
+ * to us. For each range, the list of sources is sorted by proximity relative to the given destAddress.
+ */
+ private Multimap<Range<Token>, InetAddress> getAllRangesWithSourcesFor(String table, Collection<Range<Token>> desiredRanges)
+ {
+ AbstractReplicationStrategy strat = Table.open(table).getReplicationStrategy();
+ Multimap<Range<Token>, InetAddress> rangeAddresses = strat.getRangeAddresses(metadata);
+
+ Multimap<Range<Token>, InetAddress> rangeSources = ArrayListMultimap.create();
+ for (Range<Token> desiredRange : desiredRanges)
+ {
+ for (Range<Token> range : rangeAddresses.keySet())
+ {
+ if (range.contains(desiredRange))
+ {
+ List<InetAddress> preferred = DatabaseDescriptor.getEndpointSnitch().getSortedListByProximity(address, rangeAddresses.get(range));
+ rangeSources.putAll(desiredRange, preferred);
+ break;
+ }
+ }
+
+ if (!rangeSources.keySet().contains(desiredRange))
+ throw new IllegalStateException("No sources found for " + desiredRange);
+ }
+
+ return rangeSources;
+ }
+
+ /**
+ * @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value)
+ * @param sourceFilters A (possibly empty) collection of source filters to apply. In addition to any filters given
+ * here, we always exclude ourselves.
+ * @return
+ */
+ private static Multimap<InetAddress, Range<Token>> getRangeFetchMap(Multimap<Range<Token>, InetAddress> rangesWithSources,
+ Collection<ISourceFilter> sourceFilters)
+ {
+ Multimap<InetAddress, Range<Token>> rangeFetchMapMap = HashMultimap.create();
+ for (Range<Token> range : rangesWithSources.keySet())
+ {
+ boolean foundSource = false;
+
+ outer:
+ for (InetAddress address : rangesWithSources.get(range))
+ {
+ if (address.equals(FBUtilities.getBroadcastAddress()))
+ continue;
+
+ for (ISourceFilter filter : sourceFilters)
+ {
+ if (!filter.shouldInclude(address))
+ continue outer;
+ }
+
+ rangeFetchMapMap.put(address, range);
+ foundSource = true;
+ }
+
+ if (!foundSource)
+ throw new IllegalStateException("unable to find sufficient sources for streaming range " + range);
+ }
+
+ return rangeFetchMapMap;
+ }
+
+ public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget)
+ {
+ return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(FailureDetector.instance)));
+ }
+
+ // For testing purposes
+ Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch()
+ {
+ return toFetch;
+ }
+
+ public void fetch()
+ {
+ final CountDownLatch latch = new CountDownLatch(toFetch().entries().size());
+ for (Map.Entry<String, Map.Entry<InetAddress, Collection<Range<Token>>>> entry : toFetch.entries())
+ {
+ final String table = entry.getKey();
+ final InetAddress source = entry.getValue().getKey();
+ Collection<Range<Token>> ranges = entry.getValue().getValue();
+ /* Send messages to respective folks to stream data over to me */
+ Runnable callback = new Runnable()
+ {
+ public void run()
+ {
+ latch.countDown();
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("Removed %s/%s as a %s source; remaining is %s",
+ source, table, opType, latch.getCount()));
+ }
+ };
+ if (logger.isDebugEnabled())
+ logger.debug("" + opType + "ing from " + source + " ranges " + StringUtils.join(ranges, ", "));
+ StreamIn.requestRanges(source, table, ranges, callback, opType);
+ }
+
+ try
+ {
+ latch.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4ebeddc..441a9bc 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -44,6 +44,8 @@ import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.migration.AddKeyspace;
+import org.apache.cassandra.db.migration.Migration;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.sstable.SSTableDeletingTask;
@@ -667,6 +669,21 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
return joined;
}
+ public void rebuild(String sourceDc)
+ {
+ logger_.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc);
+
+ RangeStreamer streamer = new RangeStreamer(tokenMetadata_, FBUtilities.getBroadcastAddress(), OperationType.REBUILD);
+ streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
+ if (sourceDc != null)
+ streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));
+
+ for (String table : Schema.instance.getNonSystemTables())
+ streamer.addRanges(table, getLocalRanges(table));
+
+ streamer.fetch();
+ }
+
public void setStreamThroughputMbPerSec(int value)
{
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(value);
@@ -2289,7 +2306,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
// associating table with range-to-endpoints map
rangesToStreamByTable.put(table, rangeWithEndpoints);
- Multimap<InetAddress, Range<Token>> workMap = BootStrapper.getWorkMap(rangesToFetchWithPreferredEndpoints);
+ Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints);
rangesToFetch.put(table, workMap);
if (logger_.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 9d0ed6e..6903cb2 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -345,6 +345,15 @@ public interface StorageServiceMBean
public boolean isIncrementalBackupsEnabled();
public void setIncrementalBackupsEnabled(boolean value);
+ /**
+ * Initiate a process of streaming data for which we are responsible from other nodes. It is similar to bootstrap
+ * except meant to be used on a node which is already in the cluster (typically containing no data) as an
+ * alternative to running repair.
+ *
+ * @param sourceDc Name of DC from which to select sources for streaming or null to pick any node
+ */
+ public void rebuild(String sourceDc);
+
public void bulkLoad(String directory);
public void rescheduleFailedDeletions();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/streaming/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/OperationType.java b/src/java/org/apache/cassandra/streaming/OperationType.java
index ddf60ad..beb226b 100644
--- a/src/java/org/apache/cassandra/streaming/OperationType.java
+++ b/src/java/org/apache/cassandra/streaming/OperationType.java
@@ -27,6 +27,7 @@ public enum OperationType
BOOTSTRAP,
UNBOOTSTRAP,
RESTORE_REPLICA_COUNT,
- BULK_LOAD;
+ BULK_LOAD,
+ REBUILD
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index 4e552ee..2e25436 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -111,7 +111,7 @@ public class StreamInSession
public void finished(PendingFile remoteFile, SSTableReader reader) throws IOException
{
if (logger.isDebugEnabled())
- logger.debug("Finished {}. Sending ack to {}", remoteFile, this);
+ logger.debug("Finished {} (from {}). Sending ack to {}", new Object[] {remoteFile, getHost(), this});
assert reader != null;
readers.add(reader);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 7692cd0..520b78f 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -102,6 +102,7 @@ public class NodeCmd
JOIN,
MOVE,
NETSTATS,
+ REBUILD,
REFRESH,
REMOVETOKEN,
REPAIR,
@@ -155,6 +156,7 @@ public class NodeCmd
addCmdHelp(header, "setcompactionthroughput <value_in_mb>", "Set the MB/s throughput cap for compaction in the system, or 0 to disable throttling.");
addCmdHelp(header, "setstreamthroughput <value_in_mb>", "Set the MB/s throughput cap for streaming in the system, or 0 to disable throttling.");
addCmdHelp(header, "describering [keyspace]", "Shows the token ranges info of a given keyspace.");
+ addCmdHelp(header, "rebuild [src-dc-name]", "Rebuild data by streaming from other nodes (similarly to bootstrap)");
// Two args
addCmdHelp(header, "snapshot [keyspaces...] -t [snapshotName]", "Take a snapshot of the specified keyspaces using optional name snapshotName");
@@ -702,6 +704,10 @@ public class NodeCmd
case SETSTREAMTHROUGHPUT :
if (arguments.length != 1) { badUse("Missing value argument."); }
probe.setStreamThroughput(Integer.valueOf(arguments[0]));
+
+ case REBUILD :
+ if (arguments.length > 1) { badUse("Too many arguments."); }
+ probe.rebuild(arguments.length == 1 ? arguments[0] : null);
break;
case REMOVETOKEN :
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 621a9f6..22678ae 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -638,6 +638,11 @@ public class NodeProbe
{
return ssProxy.describeRingJMX(keyspaceName);
}
+
+ public void rebuild(String sourceDc)
+ {
+ ssProxy.rebuild(sourceDc);
+ }
}
class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 1095dd3..72e5bf9 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -22,25 +22,26 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
+import java.util.HashSet;
import java.util.HashMap;
+import java.util.Set;
import java.util.Map;
-import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.config.Schema;
-import org.apache.commons.lang.StringUtils;
-import static org.junit.Assert.assertEquals;
import org.junit.Test;
-import com.google.common.collect.Multimap;
-
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.OperationType;
import org.apache.cassandra.utils.FBUtilities;
+import static org.junit.Assert.assertEquals;
+
public class BootStrapperTest extends CleanupHelper
{
@Test
@@ -164,17 +165,7 @@ public class BootStrapperTest extends CleanupHelper
TokenMetadata tmd = ss.getTokenMetadata();
assertEquals(numOldNodes, tmd.sortedTokens().size());
- BootStrapper b = new BootStrapper(myEndpoint, myToken, tmd);
- Multimap<Range<Token>, InetAddress> res = b.getRangesWithSources(table);
-
- int transferCount = 0;
- for (Map.Entry<Range<Token>, Collection<InetAddress>> e : res.asMap().entrySet())
- {
- assert e.getValue() != null && e.getValue().size() > 0 : StringUtils.join(e.getValue(), ", ");
- transferCount++;
- }
-
- assertEquals(replicationFactor, transferCount);
+ RangeStreamer s = new RangeStreamer(tmd, myEndpoint, OperationType.BOOTSTRAP);
IFailureDetector mockFailureDetector = new IFailureDetector()
{
public boolean isAlive(InetAddress ep)
@@ -189,12 +180,22 @@ public class BootStrapperTest extends CleanupHelper
public void remove(InetAddress ep) { throw new UnsupportedOperationException(); }
public void clear(InetAddress ep) { throw new UnsupportedOperationException(); }
};
- Multimap<InetAddress, Range<Token>> temp = BootStrapper.getWorkMap(res, mockFailureDetector);
+ s.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(mockFailureDetector));
+ s.addRanges(table, Table.open(table).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint));
+
+ Collection<Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = s.toFetch().get(table);
+
+ // Check we get get RF new ranges in total
+ Set<Range<Token>> ranges = new HashSet<Range<Token>>();
+ for (Map.Entry<InetAddress, Collection<Range<Token>>> e : toFetch)
+ ranges.addAll(e.getValue());
+
+ assertEquals(replicationFactor, ranges.size());
+
// there isn't any point in testing the size of these collections for any specific size. When a random partitioner
// is used, they will vary.
- assert temp.keySet().size() > 0;
- assert temp.asMap().values().iterator().next().size() > 0;
- assert !temp.keySet().iterator().next().equals(myEndpoint);
+ assert toFetch.iterator().next().getValue().size() > 0;
+ assert !toFetch.iterator().next().getKey().equals(myEndpoint);
}
private void generateFakeEndpoints(int numOldNodes) throws UnknownHostException