You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/10/02 18:03:39 UTC

git commit: Allow repairing only the node in the local DC

Updated Branches:
  refs/heads/trunk d41d5f6db -> 5eb530853


Allow repairing only the node in the local DC

patch by Alexey Zotov; reviewed by slebresne for CASSANDRA-4747


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5eb53085
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5eb53085
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5eb53085

Branch: refs/heads/trunk
Commit: 5eb53085398af4d30002785ee21f50b894208735
Parents: d41d5f6
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Oct 2 18:01:21 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Oct 2 18:03:32 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +-
 .../cassandra/service/AntiEntropyService.java      |   33 ++++++--
 .../apache/cassandra/service/StorageService.java   |   16 ++--
 .../cassandra/service/StorageServiceMBean.java     |    6 +-
 src/java/org/apache/cassandra/tools/NodeCmd.java   |    7 +-
 src/java/org/apache/cassandra/tools/NodeProbe.java |   12 ++--
 .../service/AntiEntropyServiceTestAbstract.java    |   63 +++++++++++++-
 7 files changed, 106 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb53085/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9067ba8..b33e40c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,7 +16,7 @@
  * Fix bug starting Cassandra with simple authentication (CASSANDRA-4648)
  * Add support for batchlog in CQL3 (CASSANDRA-4545)
  * Add support for multiple column family outputs in CFOF (CASSANDRA-4208)
-
+ * Support repairing only the local DC nodes (CASSANDRA-4747)
 
 1.2-beta1
  * add atomic_batch_mutate (CASSANDRA-4542, -4635)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb53085/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index 474566e..a7fa157 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -26,6 +26,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +45,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.streaming.StreamingRepairTask;
 import org.apache.cassandra.utils.*;
@@ -113,9 +116,9 @@ public class AntiEntropyService
     /**
      * Requests repairs for the given table and column families, and blocks until all repairs have been completed.
      */
-    public RepairFuture submitRepairSession(Range<Token> range, String tablename, boolean isSequential, String... cfnames)
+    public RepairFuture submitRepairSession(Range<Token> range, String tablename, boolean isSequential, boolean isLocal, String... cfnames)
     {
-        RepairFuture futureTask = new RepairSession(range, tablename, isSequential, cfnames).getFuture();
+        RepairFuture futureTask = new RepairSession(range, tablename, isSequential, isLocal, cfnames).getFuture();
         executor.execute(futureTask);
         return futureTask;
     }
@@ -139,8 +142,14 @@ public class AntiEntropyService
 
     /**
      * Return all of the neighbors with whom we share the provided range.
+     *
+     * @param table table to repair
+     * @param toRepair token to repair
+     * @param isLocal need to use only nodes from local datacenter
+     *
+     * @return neighbors with whom we share the provided range
      */
-    static Set<InetAddress> getNeighbors(String table, Range<Token> toRepair)
+    static Set<InetAddress> getNeighbors(String table, Range<Token> toRepair, boolean isLocal)
     {
         StorageService ss = StorageService.instance;
         Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table);
@@ -162,6 +171,14 @@ public class AntiEntropyService
 
         Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(rangeSuperSet));
         neighbors.remove(FBUtilities.getBroadcastAddress());
+
+        if (isLocal)
+        {
+            TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
+            Set<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
+            return Sets.intersection(neighbors, localEndpoints);
+        }
+
         return neighbors;
     }
 
@@ -575,16 +592,16 @@ public class AntiEntropyService
 
         public RepairSession(TreeRequest req, String tablename, String... cfnames)
         {
-            this(req.sessionid, req.range, tablename, false, cfnames);
+            this(req.sessionid, req.range, tablename, false, false, cfnames);
             AntiEntropyService.instance.sessions.put(getName(), this);
         }
 
-        public RepairSession(Range<Token> range, String tablename, boolean isSequential, String... cfnames)
+        public RepairSession(Range<Token> range, String tablename, boolean isSequential, boolean isLocal, String... cfnames)
         {
-            this(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()).toString(), range, tablename, isSequential, cfnames);
+            this(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()).toString(), range, tablename, isSequential, isLocal, cfnames);
         }
 
-        private RepairSession(String id, Range<Token> range, String tablename, boolean isSequential, String[] cfnames)
+        private RepairSession(String id, Range<Token> range, String tablename, boolean isSequential, boolean isLocal, String[] cfnames)
         {
             this.sessionName = id;
             this.isSequential = isSequential;
@@ -592,7 +609,7 @@ public class AntiEntropyService
             this.cfnames = cfnames;
             assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
             this.range = range;
-            this.endpoints = AntiEntropyService.getNeighbors(tablename, range);
+            this.endpoints = AntiEntropyService.getNeighbors(tablename, range, isLocal);
         }
 
         public String getName()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb53085/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 723a838..01b9e80 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2140,7 +2140,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
      * @param columnFamilies
      * @throws IOException
      */
-    public void forceTableRepair(final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
+    public void forceTableRepair(final String tableName, boolean isSequential, boolean  isLocal, final String... columnFamilies) throws IOException
     {
         if (Table.SYSTEM_KS.equals(tableName))
             return;
@@ -2152,7 +2152,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
         for (Range<Token> range : ranges)
         {
-            AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, isSequential, columnFamilies);
+            AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, isSequential, isLocal, columnFamilies);
             if (future == null)
                 continue;
             futures.add(future);
@@ -2189,7 +2189,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             logger.info("Repair command #{} completed successfully", cmd);
     }
 
-    public void forceTableRepairPrimaryRange(final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
+    public void forceTableRepairPrimaryRange(final String tableName, boolean isSequential, boolean  isLocal, final String... columnFamilies) throws IOException
     {
         if (Table.SYSTEM_KS.equals(tableName))
             return;
@@ -2197,7 +2197,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>();
         for (Range<Token> range : getLocalPrimaryRanges())
         {
-            RepairFuture future = forceTableRepair(range, tableName, isSequential, columnFamilies);
+            RepairFuture future = forceTableRepair(range, tableName, isSequential, isLocal, columnFamilies);
             if (future != null)
                 futures.add(future);
         }
@@ -2207,7 +2207,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             FBUtilities.waitOnFuture(future);
     }
 
-    public void forceTableRepairRange(String beginToken, String endToken, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
+    public void forceTableRepairRange(String beginToken, String endToken, final String tableName, boolean isSequential, boolean  isLocal, final String... columnFamilies) throws IOException
     {
         if (Table.SYSTEM_KS.equals(tableName))
             return;
@@ -2217,7 +2217,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
 
         logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
                      new Object[] {parsedBeginToken, parsedEndToken, tableName, columnFamilies});
-        AntiEntropyService.RepairFuture future = forceTableRepair(new Range<Token>(parsedBeginToken, parsedEndToken), tableName, isSequential, columnFamilies);
+        AntiEntropyService.RepairFuture future = forceTableRepair(new Range<Token>(parsedBeginToken, parsedEndToken), tableName, isSequential, isLocal, columnFamilies);
         if (future == null)
             return;
         try
@@ -2230,7 +2230,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         }
     }
 
-    public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
+    public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, boolean  isLocal, final String... columnFamilies) throws IOException
     {
         ArrayList<String> names = new ArrayList<String>();
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
@@ -2244,7 +2244,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             return null;
         }
 
-        return AntiEntropyService.instance.submitRepairSession(range, tableName, isSequential, names.toArray(new String[names.size()]));
+        return AntiEntropyService.instance.submitRepairSession(range, tableName, isSequential, isLocal, names.toArray(new String[names.size()]));
     }
 
     public void forceTerminateAllRepairSessions() {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb53085/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index ded5298..e51184a 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -265,12 +265,12 @@ public interface StorageServiceMBean
      * @param columnFamilies
      * @throws IOException
      */
-    public void forceTableRepair(String tableName, boolean isSequential, String... columnFamilies) throws IOException;
+    public void forceTableRepair(String tableName, boolean isSequential, boolean  isLocal, String... columnFamilies) throws IOException;
 
     /**
      * Triggers proactive repair but only for the node primary range.
      */
-    public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException;
+    public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, boolean  isLocal, String... columnFamilies) throws IOException;
 
     /**
      * Perform repair of a specific range.
@@ -278,7 +278,7 @@ public interface StorageServiceMBean
      * This allows incremental repair to be performed by having an external controller submitting repair jobs.
      * Note that the provided range much be a subset of one of the node local range.
      */
-    public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, String... columnFamilies) throws IOException;
+    public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, boolean  isLocal, String... columnFamilies) throws IOException;
 
     public void forceTerminateAllRepairSessions();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb53085/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index ae65832..f18a93f 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -69,6 +69,7 @@ public class NodeCmd
     private static final Pair<String, String> TOKENS_OPT = Pair.create("T", "tokens");
     private static final Pair<String, String> PRIMARY_RANGE_OPT = Pair.create("pr", "partitioner-range");
     private static final Pair<String, String> SNAPSHOT_REPAIR_OPT = Pair.create("snapshot", "with-snapshot");
+    private static final Pair<String, String> LOCAL_DC_REPAIR_OPT = Pair.create("local", "in-local-dc");
 
     private static final String DEFAULT_HOST = "127.0.0.1";
     private static final int DEFAULT_PORT = 7199;
@@ -88,6 +89,7 @@ public class NodeCmd
         options.addOption(TOKENS_OPT,   false, "display all tokens");
         options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first range returned by the partitioner for the node");
         options.addOption(SNAPSHOT_REPAIR_OPT, false, "repair one node at a time using snapshots");
+        options.addOption(LOCAL_DC_REPAIR_OPT, false, "only repair against nodes in the same datacenter");
     }
 
     public NodeCmd(NodeProbe probe)
@@ -1333,10 +1335,11 @@ public class NodeCmd
             {
                 case REPAIR  :
                     boolean snapshot = cmd.hasOption(SNAPSHOT_REPAIR_OPT.left);
+                    boolean localDC = cmd.hasOption(LOCAL_DC_REPAIR_OPT.left);
                     if (cmd.hasOption(PRIMARY_RANGE_OPT.left))
-                        probe.forceTableRepairPrimaryRange(keyspace, snapshot, columnFamilies);
+                        probe.forceTableRepairPrimaryRange(keyspace, snapshot, localDC, columnFamilies);
                     else
-                        probe.forceTableRepair(keyspace, snapshot, columnFamilies);
+                        probe.forceTableRepair(keyspace, snapshot, localDC, columnFamilies);
                     break;
                 case FLUSH   :
                     try { probe.forceTableFlush(keyspace, columnFamilies); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb53085/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index d5968c1..7774122 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -199,19 +199,19 @@ public class NodeProbe
         ssProxy.forceTableFlush(tableName, columnFamilies);
     }
 
-    public void forceTableRepair(String tableName, boolean isSequential, String... columnFamilies) throws IOException
+    public void forceTableRepair(String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
     {
-        ssProxy.forceTableRepair(tableName, isSequential, columnFamilies);
+        ssProxy.forceTableRepair(tableName, isSequential, isLocal, columnFamilies);
     }
 
-    public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException
+    public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
     {
-        ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies);
+        ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, isLocal, columnFamilies);
     }
 
-    public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, String... columnFamilies) throws IOException
+    public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
     {
-        ssProxy.forceTableRepairRange(beginToken, endToken, tableName, isSequential, columnFamilies);
+        ssProxy.forceTableRepairRange(beginToken, endToken, tableName, isSequential, isLocal, columnFamilies);
     }
 
     public void invalidateKeyCache() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb53085/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index dd4e1f2..87cc910 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -22,7 +22,8 @@ import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.*;
 
-import org.apache.cassandra.config.Schema;
+import com.google.common.collect.Sets;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -31,6 +32,8 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.PrecompactedRow;
 import org.apache.cassandra.dht.IPartitioner;
@@ -39,13 +42,13 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
+import static org.apache.cassandra.service.AntiEntropyService.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
 
-import static org.apache.cassandra.service.AntiEntropyService.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
 {
@@ -169,7 +172,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         Set<InetAddress> neighbors = new HashSet<InetAddress>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(AntiEntropyService.getNeighbors(tablename, range));
+            neighbors.addAll(AntiEntropyService.getNeighbors(tablename, range, false));
         }
         assertEquals(expected, neighbors);
     }
@@ -192,7 +195,57 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         Set<InetAddress> neighbors = new HashSet<InetAddress>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(AntiEntropyService.getNeighbors(tablename, range));
+            neighbors.addAll(AntiEntropyService.getNeighbors(tablename, range, false));
+        }
+        assertEquals(expected, neighbors);
+    }
+
+    @Test
+    public void testGetNeighborsPlusOneInLocalDC() throws Throwable
+    {
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        
+        // generate rf+1 nodes, and ensure that all nodes are returned
+        Set<InetAddress> expected = addTokens(1 + Table.open(tablename).getReplicationStrategy().getReplicationFactor());
+        expected.remove(FBUtilities.getBroadcastAddress());
+        // remove remote endpoints
+        TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology();
+        HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
+        expected = Sets.intersection(expected, localEndpoints);
+
+        Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(tablename);
+        Set<InetAddress> neighbors = new HashSet<InetAddress>();
+        for (Range<Token> range : ranges)
+        {
+            neighbors.addAll(AntiEntropyService.getNeighbors(tablename, range, true));
+        }
+        assertEquals(expected, neighbors);
+    }
+
+    @Test
+    public void testGetNeighborsTimesTwoInLocalDC() throws Throwable
+    {
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+        // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned
+        addTokens(2 * Table.open(tablename).getReplicationStrategy().getReplicationFactor());
+        AbstractReplicationStrategy ars = Table.open(tablename).getReplicationStrategy();
+        Set<InetAddress> expected = new HashSet<InetAddress>();
+        for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+        {
+            expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+        }
+        expected.remove(FBUtilities.getBroadcastAddress());
+        // remove remote endpoints
+        TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology();
+        HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
+        expected = Sets.intersection(expected, localEndpoints);
+        
+        Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(tablename);
+        Set<InetAddress> neighbors = new HashSet<InetAddress>();
+        for (Range<Token> range : ranges)
+        {
+            neighbors.addAll(AntiEntropyService.getNeighbors(tablename, range, true));
         }
         assertEquals(expected, neighbors);
     }