You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/04/17 09:30:08 UTC

[2/7] git commit: Expose repairing by a user provided range

Expose repairing by a user provided range

patch by scode and slebresne; reviewed by stuhood for CASSANDRA-3912


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b69fd1af
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b69fd1af
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b69fd1af

Branch: refs/heads/trunk
Commit: b69fd1aff7e34363298aece693c4be5a3a603c71
Parents: 3f09b79
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Apr 3 11:57:29 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Apr 17 09:26:17 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 .../cassandra/service/AntiEntropyService.java      |   22 +++++++++--
 .../apache/cassandra/service/StorageService.java   |   29 +++++++++++++++
 .../cassandra/service/StorageServiceMBean.java     |   13 +++++++
 src/java/org/apache/cassandra/tools/NodeProbe.java |    5 +++
 5 files changed, 67 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b69fd1af/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7a985ff..aa6153c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,8 @@
  * identify and blacklist corrupted SSTables from future compactions 
    (CASSANDRA-2261)
  * Move CfDef and KsDef validation out of thrift (CASSANDRA-4037)
+ * Expose repairing by a user provided range (CASSANDRA-3912)
+
 
 1.1-dev
  * Allow KS and CF names up to 48 characters (CASSANDRA-4157)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b69fd1af/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index a39ed75..0c11947 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -145,15 +145,29 @@ public class AntiEntropyService
     }
 
     /**
-     * Return all of the neighbors with whom we share data.
+     * Return all of the neighbors with whom we share the provided range.
      */
-    static Set<InetAddress> getNeighbors(String table, Range<Token> range)
+    static Set<InetAddress> getNeighbors(String table, Range<Token> toRepair)
     {
         StorageService ss = StorageService.instance;
         Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table);
-        if (!replicaSets.containsKey(range))
+        Range<Token> rangeSuperSet = null;
+        for (Range<Token> range : ss.getLocalRanges(table))
+        {
+            if (range.contains(toRepair))
+            {
+                rangeSuperSet = range;
+                break;
+            }
+            else if (range.intersects(toRepair))
+            {
+                throw new IllegalArgumentException("Requested range intersects a local range but is not fully contained in one; this would lead to imprecise repair");
+            }
+        }
+        if (rangeSuperSet == null || !replicaSets.containsKey(toRepair))
             return Collections.emptySet();
-        Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(range));
+
+        Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(rangeSuperSet));
         neighbors.remove(FBUtilities.getBroadcastAddress());
         // Excluding all node with version <= 0.7 since they don't know how to
         // create a correct merkle tree (they build it over the full range)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b69fd1af/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5e12364..3ae8b0a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -210,6 +210,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         return getPrimaryRangeForEndpoint(FBUtilities.getBroadcastAddress());
     }
 
+    // For JMX's sake. Use getLocalPrimaryRange for internal uses
+    public List<String> getPrimaryRange()
+    {
+        return getLocalPrimaryRange().asList();
+    }
+
     private final Set<InetAddress> replicatingNodes = Collections.synchronizedSet(new HashSet<InetAddress>());
     private CassandraDaemon daemon;
 
@@ -1951,6 +1957,29 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         }
     }
 
+    public void forceTableRepairRange(String beginToken, String endToken, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
+    {
+        if (Table.SYSTEM_TABLE.equals(tableName))
+            return;
+
+        Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
+        Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+
+        logger_.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
+                     new Object[] {parsedBeginToken, parsedEndToken, tableName, columnFamilies});
+        AntiEntropyService.RepairFuture future = forceTableRepair(new Range<Token>(parsedBeginToken, parsedEndToken), tableName, isSequential, columnFamilies);
+        if (future == null)
+            return;
+        try
+        {
+            future.get();
+        }
+        catch (Exception e)
+        {
+            logger_.error("Repair session " + future.session.getName() + " failed.", e);
+        }
+    }
+
     public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
     {
         ArrayList<String> names = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b69fd1af/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index be727ad..9da3896 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -136,6 +136,11 @@ public interface StorageServiceMBean
     public List <String> describeRingJMX(String keyspace) throws InvalidRequestException;
 
     /**
+     * Returns the local node's primary range.
+     */
+    public List<String> getPrimaryRange();
+
+    /**
      * Retrieve a map of pending ranges to endpoints that describe the ring topology
      * @param keyspace the keyspace to get the pending range map for.
      * @return a map of pending ranges to endpoints
@@ -241,6 +246,14 @@ public interface StorageServiceMBean
      */
     public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException;
 
+    /**
+     * Perform repair of a specific range.
+     *
+     * This allows incremental repair to be performed by having an external controller submitting repair jobs.
+     * Note that the provided range much be a subset of one of the node local range.
+     */
+    public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, String... columnFamilies) throws IOException;
+
     public void forceTerminateAllRepairSessions();
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b69fd1af/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 9609b65..f042353 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -208,6 +208,11 @@ public class NodeProbe
         ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies);
     }
 
+    public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, String... columnFamilies) throws IOException
+    {
+        ssProxy.forceTableRepairRange(beginToken, endToken, tableName, isSequential, columnFamilies);
+    }
+
     public void invalidateKeyCache() throws IOException
     {
         cacheService.invalidateKeyCache();