You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/04/17 09:30:08 UTC
[2/7] git commit: Expose repairing by a user provided range
Expose repairing by a user provided range
patch by scode and slebresne; reviewed by stuhood for CASSANDRA-3912
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b69fd1af
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b69fd1af
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b69fd1af
Branch: refs/heads/trunk
Commit: b69fd1aff7e34363298aece693c4be5a3a603c71
Parents: 3f09b79
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Apr 3 11:57:29 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Apr 17 09:26:17 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../cassandra/service/AntiEntropyService.java | 22 +++++++++--
.../apache/cassandra/service/StorageService.java | 29 +++++++++++++++
.../cassandra/service/StorageServiceMBean.java | 13 +++++++
src/java/org/apache/cassandra/tools/NodeProbe.java | 5 +++
5 files changed, 67 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b69fd1af/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7a985ff..aa6153c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,8 @@
* identify and blacklist corrupted SSTables from future compactions
(CASSANDRA-2261)
* Move CfDef and KsDef validation out of thrift (CASSANDRA-4037)
+ * Expose repairing by a user provided range (CASSANDRA-3912)
+
1.1-dev
* Allow KS and CF names up to 48 characters (CASSANDRA-4157)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b69fd1af/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index a39ed75..0c11947 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -145,15 +145,29 @@ public class AntiEntropyService
}
/**
- * Return all of the neighbors with whom we share data.
+ * Return all of the neighbors with whom we share the provided range.
*/
- static Set<InetAddress> getNeighbors(String table, Range<Token> range)
+ static Set<InetAddress> getNeighbors(String table, Range<Token> toRepair)
{
StorageService ss = StorageService.instance;
Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table);
- if (!replicaSets.containsKey(range))
+ Range<Token> rangeSuperSet = null;
+ for (Range<Token> range : ss.getLocalRanges(table))
+ {
+ if (range.contains(toRepair))
+ {
+ rangeSuperSet = range;
+ break;
+ }
+ else if (range.intersects(toRepair))
+ {
+ throw new IllegalArgumentException("Requested range intersects a local range but is not fully contained in one; this would lead to imprecise repair");
+ }
+ }
+ if (rangeSuperSet == null || !replicaSets.containsKey(toRepair))
return Collections.emptySet();
- Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(range));
+
+ Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(rangeSuperSet));
neighbors.remove(FBUtilities.getBroadcastAddress());
// Excluding all node with version <= 0.7 since they don't know how to
// create a correct merkle tree (they build it over the full range)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b69fd1af/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5e12364..3ae8b0a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -210,6 +210,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
return getPrimaryRangeForEndpoint(FBUtilities.getBroadcastAddress());
}
+ // For JMX's sake. Use getLocalPrimaryRange for internal uses
+ public List<String> getPrimaryRange()
+ {
+ return getLocalPrimaryRange().asList();
+ }
+
private final Set<InetAddress> replicatingNodes = Collections.synchronizedSet(new HashSet<InetAddress>());
private CassandraDaemon daemon;
@@ -1951,6 +1957,29 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
}
+ public void forceTableRepairRange(String beginToken, String endToken, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
+ {
+ if (Table.SYSTEM_TABLE.equals(tableName))
+ return;
+
+ Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
+ Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+
+ logger_.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
+ new Object[] {parsedBeginToken, parsedEndToken, tableName, columnFamilies});
+ AntiEntropyService.RepairFuture future = forceTableRepair(new Range<Token>(parsedBeginToken, parsedEndToken), tableName, isSequential, columnFamilies);
+ if (future == null)
+ return;
+ try
+ {
+ future.get();
+ }
+ catch (Exception e)
+ {
+ logger_.error("Repair session " + future.session.getName() + " failed.", e);
+ }
+ }
+
public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
{
ArrayList<String> names = new ArrayList<String>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b69fd1af/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index be727ad..9da3896 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -136,6 +136,11 @@ public interface StorageServiceMBean
public List <String> describeRingJMX(String keyspace) throws InvalidRequestException;
/**
+ * Returns the local node's primary range.
+ */
+ public List<String> getPrimaryRange();
+
+ /**
* Retrieve a map of pending ranges to endpoints that describe the ring topology
* @param keyspace the keyspace to get the pending range map for.
* @return a map of pending ranges to endpoints
@@ -241,6 +246,14 @@ public interface StorageServiceMBean
*/
public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException;
+ /**
+ * Perform repair of a specific range.
+ *
+ * This allows incremental repair to be performed by having an external controller submitting repair jobs.
+ * Note that the provided range much be a subset of one of the node local range.
+ */
+ public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, String... columnFamilies) throws IOException;
+
public void forceTerminateAllRepairSessions();
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b69fd1af/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 9609b65..f042353 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -208,6 +208,11 @@ public class NodeProbe
ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies);
}
+ public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, String... columnFamilies) throws IOException
+ {
+ ssProxy.forceTableRepairRange(beginToken, endToken, tableName, isSequential, columnFamilies);
+ }
+
public void invalidateKeyCache() throws IOException
{
cacheService.invalidateKeyCache();