You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/10/02 18:03:39 UTC
git commit: Allow repairing only the node in the local DC
Updated Branches:
refs/heads/trunk d41d5f6db -> 5eb530853
Allow repairing only the node in the local DC
patch by Alexey Zotov; reviewed by slebresne for CASSANDRA-4747
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5eb53085
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5eb53085
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5eb53085
Branch: refs/heads/trunk
Commit: 5eb53085398af4d30002785ee21f50b894208735
Parents: d41d5f6
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Oct 2 18:01:21 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Oct 2 18:03:32 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../cassandra/service/AntiEntropyService.java | 33 ++++++--
.../apache/cassandra/service/StorageService.java | 16 ++--
.../cassandra/service/StorageServiceMBean.java | 6 +-
src/java/org/apache/cassandra/tools/NodeCmd.java | 7 +-
src/java/org/apache/cassandra/tools/NodeProbe.java | 12 ++--
.../service/AntiEntropyServiceTestAbstract.java | 63 +++++++++++++-
7 files changed, 106 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb53085/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9067ba8..b33e40c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,7 +16,7 @@
* Fix bug starting Cassandra with simple authentication (CASSANDRA-4648)
* Add support for batchlog in CQL3 (CASSANDRA-4545)
* Add support for multiple column family outputs in CFOF (CASSANDRA-4208)
-
+ * Support repairing only the local DC nodes (CASSANDRA-4747)
1.2-beta1
* add atomic_batch_mutate (CASSANDRA-4542, -4635)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb53085/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index 474566e..a7fa157 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -26,6 +26,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +45,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.*;
import org.apache.cassandra.streaming.StreamingRepairTask;
import org.apache.cassandra.utils.*;
@@ -113,9 +116,9 @@ public class AntiEntropyService
/**
* Requests repairs for the given table and column families, and blocks until all repairs have been completed.
*/
- public RepairFuture submitRepairSession(Range<Token> range, String tablename, boolean isSequential, String... cfnames)
+ public RepairFuture submitRepairSession(Range<Token> range, String tablename, boolean isSequential, boolean isLocal, String... cfnames)
{
- RepairFuture futureTask = new RepairSession(range, tablename, isSequential, cfnames).getFuture();
+ RepairFuture futureTask = new RepairSession(range, tablename, isSequential, isLocal, cfnames).getFuture();
executor.execute(futureTask);
return futureTask;
}
@@ -139,8 +142,14 @@ public class AntiEntropyService
/**
* Return all of the neighbors with whom we share the provided range.
+ *
+ * @param table table to repair
+ * @param toRepair token to repair
+ * @param isLocal need to use only nodes from local datacenter
+ *
+ * @return neighbors with whom we share the provided range
*/
- static Set<InetAddress> getNeighbors(String table, Range<Token> toRepair)
+ static Set<InetAddress> getNeighbors(String table, Range<Token> toRepair, boolean isLocal)
{
StorageService ss = StorageService.instance;
Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table);
@@ -162,6 +171,14 @@ public class AntiEntropyService
Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(rangeSuperSet));
neighbors.remove(FBUtilities.getBroadcastAddress());
+
+ if (isLocal)
+ {
+ TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
+ Set<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
+ return Sets.intersection(neighbors, localEndpoints);
+ }
+
return neighbors;
}
@@ -575,16 +592,16 @@ public class AntiEntropyService
public RepairSession(TreeRequest req, String tablename, String... cfnames)
{
- this(req.sessionid, req.range, tablename, false, cfnames);
+ this(req.sessionid, req.range, tablename, false, false, cfnames);
AntiEntropyService.instance.sessions.put(getName(), this);
}
- public RepairSession(Range<Token> range, String tablename, boolean isSequential, String... cfnames)
+ public RepairSession(Range<Token> range, String tablename, boolean isSequential, boolean isLocal, String... cfnames)
{
- this(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()).toString(), range, tablename, isSequential, cfnames);
+ this(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()).toString(), range, tablename, isSequential, isLocal, cfnames);
}
- private RepairSession(String id, Range<Token> range, String tablename, boolean isSequential, String[] cfnames)
+ private RepairSession(String id, Range<Token> range, String tablename, boolean isSequential, boolean isLocal, String[] cfnames)
{
this.sessionName = id;
this.isSequential = isSequential;
@@ -592,7 +609,7 @@ public class AntiEntropyService
this.cfnames = cfnames;
assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
this.range = range;
- this.endpoints = AntiEntropyService.getNeighbors(tablename, range);
+ this.endpoints = AntiEntropyService.getNeighbors(tablename, range, isLocal);
}
public String getName()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb53085/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 723a838..01b9e80 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2140,7 +2140,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
* @param columnFamilies
* @throws IOException
*/
- public void forceTableRepair(final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
+ public void forceTableRepair(final String tableName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
{
if (Table.SYSTEM_KS.equals(tableName))
return;
@@ -2152,7 +2152,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
for (Range<Token> range : ranges)
{
- AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, isSequential, columnFamilies);
+ AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, isSequential, isLocal, columnFamilies);
if (future == null)
continue;
futures.add(future);
@@ -2189,7 +2189,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
logger.info("Repair command #{} completed successfully", cmd);
}
- public void forceTableRepairPrimaryRange(final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
+ public void forceTableRepairPrimaryRange(final String tableName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
{
if (Table.SYSTEM_KS.equals(tableName))
return;
@@ -2197,7 +2197,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>();
for (Range<Token> range : getLocalPrimaryRanges())
{
- RepairFuture future = forceTableRepair(range, tableName, isSequential, columnFamilies);
+ RepairFuture future = forceTableRepair(range, tableName, isSequential, isLocal, columnFamilies);
if (future != null)
futures.add(future);
}
@@ -2207,7 +2207,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
FBUtilities.waitOnFuture(future);
}
- public void forceTableRepairRange(String beginToken, String endToken, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
+ public void forceTableRepairRange(String beginToken, String endToken, final String tableName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
{
if (Table.SYSTEM_KS.equals(tableName))
return;
@@ -2217,7 +2217,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
new Object[] {parsedBeginToken, parsedEndToken, tableName, columnFamilies});
- AntiEntropyService.RepairFuture future = forceTableRepair(new Range<Token>(parsedBeginToken, parsedEndToken), tableName, isSequential, columnFamilies);
+ AntiEntropyService.RepairFuture future = forceTableRepair(new Range<Token>(parsedBeginToken, parsedEndToken), tableName, isSequential, isLocal, columnFamilies);
if (future == null)
return;
try
@@ -2230,7 +2230,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
}
- public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
+ public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
{
ArrayList<String> names = new ArrayList<String>();
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
@@ -2244,7 +2244,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
return null;
}
- return AntiEntropyService.instance.submitRepairSession(range, tableName, isSequential, names.toArray(new String[names.size()]));
+ return AntiEntropyService.instance.submitRepairSession(range, tableName, isSequential, isLocal, names.toArray(new String[names.size()]));
}
public void forceTerminateAllRepairSessions() {
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb53085/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index ded5298..e51184a 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -265,12 +265,12 @@ public interface StorageServiceMBean
* @param columnFamilies
* @throws IOException
*/
- public void forceTableRepair(String tableName, boolean isSequential, String... columnFamilies) throws IOException;
+ public void forceTableRepair(String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException;
/**
* Triggers proactive repair but only for the node primary range.
*/
- public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException;
+ public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException;
/**
* Perform repair of a specific range.
@@ -278,7 +278,7 @@ public interface StorageServiceMBean
* This allows incremental repair to be performed by having an external controller submitting repair jobs.
* Note that the provided range much be a subset of one of the node local range.
*/
- public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, String... columnFamilies) throws IOException;
+ public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException;
public void forceTerminateAllRepairSessions();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb53085/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index ae65832..f18a93f 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -69,6 +69,7 @@ public class NodeCmd
private static final Pair<String, String> TOKENS_OPT = Pair.create("T", "tokens");
private static final Pair<String, String> PRIMARY_RANGE_OPT = Pair.create("pr", "partitioner-range");
private static final Pair<String, String> SNAPSHOT_REPAIR_OPT = Pair.create("snapshot", "with-snapshot");
+ private static final Pair<String, String> LOCAL_DC_REPAIR_OPT = Pair.create("local", "in-local-dc");
private static final String DEFAULT_HOST = "127.0.0.1";
private static final int DEFAULT_PORT = 7199;
@@ -88,6 +89,7 @@ public class NodeCmd
options.addOption(TOKENS_OPT, false, "display all tokens");
options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first range returned by the partitioner for the node");
options.addOption(SNAPSHOT_REPAIR_OPT, false, "repair one node at a time using snapshots");
+ options.addOption(LOCAL_DC_REPAIR_OPT, false, "only repair against nodes in the same datacenter");
}
public NodeCmd(NodeProbe probe)
@@ -1333,10 +1335,11 @@ public class NodeCmd
{
case REPAIR :
boolean snapshot = cmd.hasOption(SNAPSHOT_REPAIR_OPT.left);
+ boolean localDC = cmd.hasOption(LOCAL_DC_REPAIR_OPT.left);
if (cmd.hasOption(PRIMARY_RANGE_OPT.left))
- probe.forceTableRepairPrimaryRange(keyspace, snapshot, columnFamilies);
+ probe.forceTableRepairPrimaryRange(keyspace, snapshot, localDC, columnFamilies);
else
- probe.forceTableRepair(keyspace, snapshot, columnFamilies);
+ probe.forceTableRepair(keyspace, snapshot, localDC, columnFamilies);
break;
case FLUSH :
try { probe.forceTableFlush(keyspace, columnFamilies); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb53085/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index d5968c1..7774122 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -199,19 +199,19 @@ public class NodeProbe
ssProxy.forceTableFlush(tableName, columnFamilies);
}
- public void forceTableRepair(String tableName, boolean isSequential, String... columnFamilies) throws IOException
+ public void forceTableRepair(String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
{
- ssProxy.forceTableRepair(tableName, isSequential, columnFamilies);
+ ssProxy.forceTableRepair(tableName, isSequential, isLocal, columnFamilies);
}
- public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException
+ public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
{
- ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies);
+ ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, isLocal, columnFamilies);
}
- public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, String... columnFamilies) throws IOException
+ public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
{
- ssProxy.forceTableRepairRange(beginToken, endToken, tableName, isSequential, columnFamilies);
+ ssProxy.forceTableRepairRange(beginToken, endToken, tableName, isSequential, isLocal, columnFamilies);
}
public void invalidateKeyCache() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb53085/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index dd4e1f2..87cc910 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -22,7 +22,8 @@ import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.*;
-import org.apache.cassandra.config.Schema;
+import com.google.common.collect.Sets;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -31,6 +32,8 @@ import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.PrecompactedRow;
import org.apache.cassandra.dht.IPartitioner;
@@ -39,13 +42,13 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
+import static org.apache.cassandra.service.AntiEntropyService.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
-import static org.apache.cassandra.service.AntiEntropyService.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import org.apache.cassandra.utils.ByteBufferUtil;
public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
{
@@ -169,7 +172,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(AntiEntropyService.getNeighbors(tablename, range));
+ neighbors.addAll(AntiEntropyService.getNeighbors(tablename, range, false));
}
assertEquals(expected, neighbors);
}
@@ -192,7 +195,57 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(AntiEntropyService.getNeighbors(tablename, range));
+ neighbors.addAll(AntiEntropyService.getNeighbors(tablename, range, false));
+ }
+ assertEquals(expected, neighbors);
+ }
+
+ @Test
+ public void testGetNeighborsPlusOneInLocalDC() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf+1 nodes, and ensure that all nodes are returned
+ Set<InetAddress> expected = addTokens(1 + Table.open(tablename).getReplicationStrategy().getReplicationFactor());
+ expected.remove(FBUtilities.getBroadcastAddress());
+ // remove remote endpoints
+ TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology();
+ HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
+ expected = Sets.intersection(expected, localEndpoints);
+
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(tablename);
+ Set<InetAddress> neighbors = new HashSet<InetAddress>();
+ for (Range<Token> range : ranges)
+ {
+ neighbors.addAll(AntiEntropyService.getNeighbors(tablename, range, true));
+ }
+ assertEquals(expected, neighbors);
+ }
+
+ @Test
+ public void testGetNeighborsTimesTwoInLocalDC() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned
+ addTokens(2 * Table.open(tablename).getReplicationStrategy().getReplicationFactor());
+ AbstractReplicationStrategy ars = Table.open(tablename).getReplicationStrategy();
+ Set<InetAddress> expected = new HashSet<InetAddress>();
+ for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+ {
+ expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+ }
+ expected.remove(FBUtilities.getBroadcastAddress());
+ // remove remote endpoints
+ TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology();
+ HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
+ expected = Sets.intersection(expected, localEndpoints);
+
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(tablename);
+ Set<InetAddress> neighbors = new HashSet<InetAddress>();
+ for (Range<Token> range : ranges)
+ {
+ neighbors.addAll(AntiEntropyService.getNeighbors(tablename, range, true));
}
assertEquals(expected, neighbors);
}