You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/09/17 16:48:47 UTC
[1/3] git commit: Make repair -pr work with -local
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 90a211455 -> 125b4642d
refs/heads/trunk 50c27cca4 -> 87ba0dc1b
Make repair -pr work with -local
patch by Paulo Motta; reviewed by yukim for CASSANDRA-7450
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/125b4642
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/125b4642
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/125b4642
Branch: refs/heads/cassandra-2.1
Commit: 125b4642db63a37c560dec4615ea04662c6e45ef
Parents: 90a2114
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Sep 17 09:46:52 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Sep 17 09:46:52 2014 -0500
----------------------------------------------------------------------
.../cassandra/service/StorageService.java | 92 +++++---
.../org/apache/cassandra/tools/NodeTool.java | 2 +-
.../service/StorageServiceServerTest.java | 210 ++++++++++++++++++-
3 files changed, 267 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/125b4642/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 1aa3b24..b1fe20e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -43,7 +43,6 @@ import ch.qos.logback.classic.jmx.JMXConfiguratorMBean;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.*;
import com.google.common.util.concurrent.FutureCallback;
@@ -156,11 +155,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return getRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddress());
}
- public Collection<Range<Token>> getLocalPrimaryRanges(String keyspace)
+ public Collection<Range<Token>> getPrimaryRanges(String keyspace)
{
return getPrimaryRangesForEndpoint(keyspace, FBUtilities.getBroadcastAddress());
}
+ public Collection<Range<Token>> getPrimaryRangesWithinDC(String keyspace)
+ {
+ return getPrimaryRangeForEndpointWithinDC(keyspace, FBUtilities.getBroadcastAddress());
+ }
+
private final Set<InetAddress> replicatingNodes = Collections.synchronizedSet(new HashSet<InetAddress>());
private CassandraDaemon daemon;
@@ -332,7 +336,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
throw new IllegalStateException("No configured daemon");
}
-
+
try
{
daemon.nativeServer.start();
@@ -430,10 +434,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS))));
-
+
SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
- return tokens;
+ return tokens;
}
catch (IOException e)
{
@@ -2464,12 +2468,22 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
{
- // when repairing only primary range, dataCenter nor hosts can be set
- if (primaryRange && (dataCenters != null || hosts != null))
+ Collection<Range<Token>> ranges;
+ if (primaryRange)
+ {
+ // when repairing only primary range, neither dataCenters nor hosts can be set
+ if (dataCenters == null && hosts == null)
+ ranges = getPrimaryRanges(keyspace);
+ // except dataCenters only contain local DC (i.e. -local)
+ else if (dataCenters != null && dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+ ranges = getPrimaryRangesWithinDC(keyspace);
+ else
+ throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+ }
+ else
{
- throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+ ranges = getLocalRanges(keyspace);
}
- Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, fullRepair, columnFamilies);
}
@@ -2494,12 +2508,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)
{
- // when repairing only primary range, you cannot repair only on local DC
- if (primaryRange && isLocal)
+ Collection<Range<Token>> ranges;
+ if (primaryRange)
+ {
+ ranges = isLocal ? getPrimaryRangesWithinDC(keyspace) : getPrimaryRanges(keyspace);
+ }
+ else
{
- throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+ ranges = getLocalRanges(keyspace);
}
- Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
+
return forceRepairAsync(keyspace, isSequential, isLocal, ranges, fullRepair, columnFamilies);
}
@@ -2723,7 +2741,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* The node that stores replica primarily is defined as the first node returned
* by {@link AbstractReplicationStrategy#calculateNaturalEndpoints}.
*
- * @param keyspace
+ * @param keyspace Keyspace name to check primary ranges
* @param ep endpoint we are interested in.
* @return primary ranges for the specified endpoint.
*/
@@ -2742,20 +2760,38 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
/**
- * Previously, primary range is the range that the node is responsible for and calculated
- * only from the token assigned to the node.
- * But this does not take replication strategy into account, and therefore returns insufficient
- * range especially using NTS with replication only to certain DC(see CASSANDRA-5424).
+ * Get the "primary ranges" within local DC for the specified keyspace and endpoint.
*
- * @deprecated
- * @param ep endpoint we are interested in.
- * @return range for the specified endpoint.
+ * @see #getPrimaryRangesForEndpoint(String, java.net.InetAddress)
+ * @param keyspace Keyspace name to check primary ranges
+ * @param referenceEndpoint endpoint we are interested in.
+ * @return primary ranges within local DC for the specified endpoint.
*/
- @Deprecated
- @VisibleForTesting
- public Range<Token> getPrimaryRangeForEndpoint(InetAddress ep)
+ public Collection<Range<Token>> getPrimaryRangeForEndpointWithinDC(String keyspace, InetAddress referenceEndpoint)
{
- return tokenMetadata.getPrimaryRangeFor(tokenMetadata.getToken(ep));
+ TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap();
+ String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(referenceEndpoint);
+ Collection<InetAddress> localDcNodes = metadata.getTopology().getDatacenterEndpoints().get(localDC);
+ AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
+
+ Collection<Range<Token>> localDCPrimaryRanges = new HashSet<>();
+ for (Token token : metadata.sortedTokens())
+ {
+ List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(token, metadata);
+ for (InetAddress endpoint : endpoints)
+ {
+ if (localDcNodes.contains(endpoint))
+ {
+ if (endpoint.equals(referenceEndpoint))
+ {
+ localDCPrimaryRanges.add(new Range<>(metadata.getPredecessor(token), token));
+ }
+ break;
+ }
+ }
+ }
+
+ return localDCPrimaryRanges;
}
/**
@@ -2861,7 +2897,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// if both classQualifer and rawLevel are empty, reload from configuration
if (StringUtils.isBlank(classQualifier) && StringUtils.isBlank(rawLevel) )
{
- JMXConfiguratorMBean jmxConfiguratorMBean = JMX.newMBeanProxy(ManagementFactory.getPlatformMBeanServer(),
+ JMXConfiguratorMBean jmxConfiguratorMBean = JMX.newMBeanProxy(ManagementFactory.getPlatformMBeanServer(),
new ObjectName("ch.qos.logback.classic:Name=default,Type=ch.qos.logback.classic.jmx.JMXConfigurator"),
JMXConfiguratorMBean.class);
jmxConfiguratorMBean.reloadDefaultConfiguration();
@@ -2879,7 +2915,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logBackLogger.setLevel(level);
logger.info("set log level to {} for classes under '{}' (if the level doesn't look like '{}' then the logger couldn't parse '{}')", level, classQualifier, rawLevel, rawLevel);
}
-
+
/**
* @return the runtime logging levels for all the configured loggers
*/
@@ -3228,7 +3264,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
oldEndpoints.removeAll(newEndpoints);
- //No relocation required
+ //No relocation required
if (oldEndpoints.isEmpty())
continue;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/125b4642/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 91f9bf0..1d7b1ad 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -1674,7 +1674,7 @@ public class NodeTool
List<String> keyspaces = parseOptionalKeyspace(args, probe);
String[] cfnames = parseOptionalColumnFamilies(args);
- if (primaryRange && (localDC || !specificHosts.isEmpty() || !specificHosts.isEmpty()))
+ if (primaryRange && (!specificDataCenters.isEmpty() || !specificHosts.isEmpty()))
throw new RuntimeException("Primary range repair should be performed on all nodes in the cluster.");
for (String keyspace : keyspaces)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/125b4642/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
index 81c5261..84b7a3c 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
@@ -31,6 +31,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Keyspace;
@@ -39,8 +41,6 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.StringToken;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.PropertyFileSnitch;
import org.apache.cassandra.locator.TokenMetadata;
@@ -81,14 +81,14 @@ public class StorageServiceServerTest
public void testGetAllRangesEmpty()
{
List<Token> toks = Collections.emptyList();
- assertEquals(Collections.emptyList(), StorageService.instance.getAllRanges(toks));
+ assertEquals(Collections.<Range<Token>>emptyList(), StorageService.instance.getAllRanges(toks));
}
@Test
public void testSnapshot() throws IOException
{
// no need to insert extra data, even an "empty" database will have a little information in the system keyspace
- StorageService.instance.takeSnapshot("snapshot", new String[0]);
+ StorageService.instance.takeSnapshot("snapshot");
}
@Test
@@ -99,6 +99,50 @@ public class StorageServiceServerTest
}
@Test
+ public void testPrimaryRangeForEndpointWithinDCWithNetworkTopologyStrategy() throws Exception
+ {
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ metadata.clearUnsafe();
+
+ // DC1
+ metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
+ metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+
+ // DC2
+ metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
+ metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+
+ Map<String, String> configOptions = new HashMap<>();
+ configOptions.put("DC1", "1");
+ configOptions.put("DC2", "1");
+
+ Keyspace.clear("Keyspace1");
+ KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "NetworkTopologyStrategy", configOptions, false);
+ Schema.instance.setKeyspaceDefinition(meta);
+
+ Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name,
+ InetAddress.getByName("127.0.0.1"));
+ assertEquals(2, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2"));
+ assertEquals(2, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4"));
+ assertEquals(2, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5"));
+ assertEquals(2, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+ }
+
+ @Test
public void testPrimaryRangesWithNetworkTopologyStrategy() throws Exception
{
TokenMetadata metadata = StorageService.instance.getTokenMetadata();
@@ -110,7 +154,7 @@ public class StorageServiceServerTest
metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
- Map<String, String> configOptions = new HashMap<String, String>();
+ Map<String, String> configOptions = new HashMap<>();
configOptions.put("DC1", "1");
configOptions.put("DC2", "1");
@@ -147,7 +191,7 @@ public class StorageServiceServerTest
metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
- Map<String, String> configOptions = new HashMap<String, String>();
+ Map<String, String> configOptions = new HashMap<>();
configOptions.put("DC2", "2");
Keyspace.clear("Keyspace1");
@@ -174,6 +218,45 @@ public class StorageServiceServerTest
}
@Test
+ public void testPrimaryRangeForEndpointWithinDCWithNetworkTopologyStrategyOneDCOnly() throws Exception
+ {
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ metadata.clearUnsafe();
+ // DC1
+ metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
+ metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+ // DC2
+ metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
+ metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+
+ Map<String, String> configOptions = new HashMap<>();
+ configOptions.put("DC2", "2");
+
+ Keyspace.clear("Keyspace1");
+ KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "NetworkTopologyStrategy", configOptions, false);
+ Schema.instance.setKeyspaceDefinition(meta);
+
+ // endpoints in DC1 should not have primary range
+ Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1"));
+ assertTrue(primaryRanges.isEmpty());
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name,
+ InetAddress.getByName("127.0.0.2"));
+ assertTrue(primaryRanges.isEmpty());
+
+ // endpoints in DC2 should have primary ranges which also cover DC1
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4"));
+ assertTrue(primaryRanges.size() == 2);
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5"));
+ assertTrue(primaryRanges.size() == 2);
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+ }
+
+ @Test
public void testPrimaryRangesWithVnodes() throws Exception
{
TokenMetadata metadata = StorageService.instance.getTokenMetadata();
@@ -197,7 +280,7 @@ public class StorageServiceServerTest
dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("K"));
metadata.updateNormalTokens(dc2);
- Map<String, String> configOptions = new HashMap<String, String>();
+ Map<String, String> configOptions = new HashMap<>();
configOptions.put("DC2", "2");
Keyspace.clear("Keyspace1");
@@ -235,6 +318,86 @@ public class StorageServiceServerTest
assert primaryRanges.contains(new Range<Token>(new StringToken("H"), new StringToken("I")));
assert primaryRanges.contains(new Range<Token>(new StringToken("I"), new StringToken("J")));
}
+
+ @Test
+ public void testPrimaryRangeForEndpointWithinDCWithVnodes() throws Exception
+ {
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ metadata.clearUnsafe();
+
+ // DC1
+ Multimap<InetAddress, Token> dc1 = HashMultimap.create();
+ dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("A"));
+ dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("E"));
+ dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("H"));
+ dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("C"));
+ dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("I"));
+ dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("J"));
+ metadata.updateNormalTokens(dc1);
+
+ // DC2
+ Multimap<InetAddress, Token> dc2 = HashMultimap.create();
+ dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("B"));
+ dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("G"));
+ dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("L"));
+ dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("D"));
+ dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("F"));
+ dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("K"));
+ metadata.updateNormalTokens(dc2);
+
+ Map<String, String> configOptions = new HashMap<>();
+ configOptions.put("DC1", "1");
+ configOptions.put("DC2", "2");
+
+ Keyspace.clear("Keyspace1");
+ KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "NetworkTopologyStrategy", configOptions, false);
+ Schema.instance.setKeyspaceDefinition(meta);
+
+ // endpoints in DC1 should have primary ranges which also cover DC2
+ Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1"));
+ assertEquals(8, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("J"), new StringToken("K"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("K"), new StringToken("L"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("L"), new StringToken("A"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("E"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("E"), new StringToken("F"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("F"), new StringToken("G"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("G"), new StringToken("H"))));
+
+ // endpoints in DC1 should have primary ranges which also cover DC2
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2"));
+ assertEquals(4, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("H"), new StringToken("I"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("I"), new StringToken("J"))));
+
+ // endpoints in DC2 should have primary ranges which also cover DC1
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4"));
+ assertEquals(4, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("F"), new StringToken("G"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("K"), new StringToken("L"))));
+ // because /127.0.0.4 holds token "B" which is the next to token "A" from /127.0.0.1,
+ // the node covers range (L, A]
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("L"), new StringToken("A"))));
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5"));
+ assertTrue(primaryRanges.size() == 8);
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("E"), new StringToken("F"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("J"), new StringToken("K"))));
+ // ranges from /127.0.0.1
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("E"))));
+ // the next token to "H" in DC2 is "K" in /127.0.0.5, so (G, H] goes to /127.0.0.5
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("G"), new StringToken("H"))));
+ // ranges from /127.0.0.2
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("H"), new StringToken("I"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("I"), new StringToken("J"))));
+ }
+
@Test
public void testPrimaryRangesWithSimpleStrategy() throws Exception
{
@@ -245,7 +408,7 @@ public class StorageServiceServerTest
metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.2"));
metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.3"));
- Map<String, String> configOptions = new HashMap<String, String>();
+ Map<String, String> configOptions = new HashMap<>();
configOptions.put("replication_factor", "2");
Keyspace.clear("Keyspace1");
@@ -264,4 +427,35 @@ public class StorageServiceServerTest
assert primaryRanges.size() == 1;
assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")));
}
+
+ /* Does not make much sense to use -local and -pr with simplestrategy, but just to prevent human errors */
+ @Test
+ public void testPrimaryRangeForEndpointWithinDCWithSimpleStrategy() throws Exception
+ {
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ metadata.clearUnsafe();
+
+ metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
+ metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.2"));
+ metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.3"));
+
+ Map<String, String> configOptions = new HashMap<>();
+ configOptions.put("replication_factor", "2");
+
+ Keyspace.clear("Keyspace1");
+ KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "SimpleStrategy", configOptions, false);
+ Schema.instance.setKeyspaceDefinition(meta);
+
+ Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1"));
+ assert primaryRanges.size() == 1;
+ assert primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("A")));
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2"));
+ assert primaryRanges.size() == 1;
+ assert primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B")));
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.3"));
+ assert primaryRanges.size() == 1;
+ assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")));
+ }
}
[2/3] git commit: Make repair -pr work with -local
Posted by yu...@apache.org.
Make repair -pr work with -local
patch by Paulo Motta; reviewed by yukim for CASSANDRA-7450
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/125b4642
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/125b4642
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/125b4642
Branch: refs/heads/trunk
Commit: 125b4642db63a37c560dec4615ea04662c6e45ef
Parents: 90a2114
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Sep 17 09:46:52 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Sep 17 09:46:52 2014 -0500
----------------------------------------------------------------------
.../cassandra/service/StorageService.java | 92 +++++---
.../org/apache/cassandra/tools/NodeTool.java | 2 +-
.../service/StorageServiceServerTest.java | 210 ++++++++++++++++++-
3 files changed, 267 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/125b4642/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 1aa3b24..b1fe20e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -43,7 +43,6 @@ import ch.qos.logback.classic.jmx.JMXConfiguratorMBean;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.*;
import com.google.common.util.concurrent.FutureCallback;
@@ -156,11 +155,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return getRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddress());
}
- public Collection<Range<Token>> getLocalPrimaryRanges(String keyspace)
+ public Collection<Range<Token>> getPrimaryRanges(String keyspace)
{
return getPrimaryRangesForEndpoint(keyspace, FBUtilities.getBroadcastAddress());
}
+ public Collection<Range<Token>> getPrimaryRangesWithinDC(String keyspace)
+ {
+ return getPrimaryRangeForEndpointWithinDC(keyspace, FBUtilities.getBroadcastAddress());
+ }
+
private final Set<InetAddress> replicatingNodes = Collections.synchronizedSet(new HashSet<InetAddress>());
private CassandraDaemon daemon;
@@ -332,7 +336,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
throw new IllegalStateException("No configured daemon");
}
-
+
try
{
daemon.nativeServer.start();
@@ -430,10 +434,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS))));
-
+
SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
- return tokens;
+ return tokens;
}
catch (IOException e)
{
@@ -2464,12 +2468,22 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
{
- // when repairing only primary range, dataCenter nor hosts can be set
- if (primaryRange && (dataCenters != null || hosts != null))
+ Collection<Range<Token>> ranges;
+ if (primaryRange)
+ {
+ // when repairing only primary range, neither dataCenters nor hosts can be set
+ if (dataCenters == null && hosts == null)
+ ranges = getPrimaryRanges(keyspace);
+ // except dataCenters only contain local DC (i.e. -local)
+ else if (dataCenters != null && dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+ ranges = getPrimaryRangesWithinDC(keyspace);
+ else
+ throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+ }
+ else
{
- throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+ ranges = getLocalRanges(keyspace);
}
- Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, fullRepair, columnFamilies);
}
@@ -2494,12 +2508,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)
{
- // when repairing only primary range, you cannot repair only on local DC
- if (primaryRange && isLocal)
+ Collection<Range<Token>> ranges;
+ if (primaryRange)
+ {
+ ranges = isLocal ? getPrimaryRangesWithinDC(keyspace) : getPrimaryRanges(keyspace);
+ }
+ else
{
- throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+ ranges = getLocalRanges(keyspace);
}
- Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
+
return forceRepairAsync(keyspace, isSequential, isLocal, ranges, fullRepair, columnFamilies);
}
@@ -2723,7 +2741,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* The node that stores replica primarily is defined as the first node returned
* by {@link AbstractReplicationStrategy#calculateNaturalEndpoints}.
*
- * @param keyspace
+ * @param keyspace Keyspace name to check primary ranges
* @param ep endpoint we are interested in.
* @return primary ranges for the specified endpoint.
*/
@@ -2742,20 +2760,38 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
/**
- * Previously, primary range is the range that the node is responsible for and calculated
- * only from the token assigned to the node.
- * But this does not take replication strategy into account, and therefore returns insufficient
- * range especially using NTS with replication only to certain DC(see CASSANDRA-5424).
+ * Get the "primary ranges" within local DC for the specified keyspace and endpoint.
*
- * @deprecated
- * @param ep endpoint we are interested in.
- * @return range for the specified endpoint.
+ * @see #getPrimaryRangesForEndpoint(String, java.net.InetAddress)
+ * @param keyspace Keyspace name to check primary ranges
+ * @param referenceEndpoint endpoint we are interested in.
+ * @return primary ranges within local DC for the specified endpoint.
*/
- @Deprecated
- @VisibleForTesting
- public Range<Token> getPrimaryRangeForEndpoint(InetAddress ep)
+ public Collection<Range<Token>> getPrimaryRangeForEndpointWithinDC(String keyspace, InetAddress referenceEndpoint)
{
- return tokenMetadata.getPrimaryRangeFor(tokenMetadata.getToken(ep));
+ TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap();
+ String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(referenceEndpoint);
+ Collection<InetAddress> localDcNodes = metadata.getTopology().getDatacenterEndpoints().get(localDC);
+ AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
+
+ Collection<Range<Token>> localDCPrimaryRanges = new HashSet<>();
+ for (Token token : metadata.sortedTokens())
+ {
+ List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(token, metadata);
+ for (InetAddress endpoint : endpoints)
+ {
+ if (localDcNodes.contains(endpoint))
+ {
+ if (endpoint.equals(referenceEndpoint))
+ {
+ localDCPrimaryRanges.add(new Range<>(metadata.getPredecessor(token), token));
+ }
+ break;
+ }
+ }
+ }
+
+ return localDCPrimaryRanges;
}
/**
@@ -2861,7 +2897,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// if both classQualifer and rawLevel are empty, reload from configuration
if (StringUtils.isBlank(classQualifier) && StringUtils.isBlank(rawLevel) )
{
- JMXConfiguratorMBean jmxConfiguratorMBean = JMX.newMBeanProxy(ManagementFactory.getPlatformMBeanServer(),
+ JMXConfiguratorMBean jmxConfiguratorMBean = JMX.newMBeanProxy(ManagementFactory.getPlatformMBeanServer(),
new ObjectName("ch.qos.logback.classic:Name=default,Type=ch.qos.logback.classic.jmx.JMXConfigurator"),
JMXConfiguratorMBean.class);
jmxConfiguratorMBean.reloadDefaultConfiguration();
@@ -2879,7 +2915,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logBackLogger.setLevel(level);
logger.info("set log level to {} for classes under '{}' (if the level doesn't look like '{}' then the logger couldn't parse '{}')", level, classQualifier, rawLevel, rawLevel);
}
-
+
/**
* @return the runtime logging levels for all the configured loggers
*/
@@ -3228,7 +3264,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
oldEndpoints.removeAll(newEndpoints);
- //No relocation required
+ //No relocation required
if (oldEndpoints.isEmpty())
continue;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/125b4642/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 91f9bf0..1d7b1ad 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -1674,7 +1674,7 @@ public class NodeTool
List<String> keyspaces = parseOptionalKeyspace(args, probe);
String[] cfnames = parseOptionalColumnFamilies(args);
- if (primaryRange && (localDC || !specificHosts.isEmpty() || !specificHosts.isEmpty()))
+ if (primaryRange && (!specificDataCenters.isEmpty() || !specificHosts.isEmpty()))
throw new RuntimeException("Primary range repair should be performed on all nodes in the cluster.");
for (String keyspace : keyspaces)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/125b4642/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
index 81c5261..84b7a3c 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
@@ -31,6 +31,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Keyspace;
@@ -39,8 +41,6 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.StringToken;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.PropertyFileSnitch;
import org.apache.cassandra.locator.TokenMetadata;
@@ -81,14 +81,14 @@ public class StorageServiceServerTest
public void testGetAllRangesEmpty()
{
List<Token> toks = Collections.emptyList();
- assertEquals(Collections.emptyList(), StorageService.instance.getAllRanges(toks));
+ assertEquals(Collections.<Range<Token>>emptyList(), StorageService.instance.getAllRanges(toks));
}
@Test
public void testSnapshot() throws IOException
{
// no need to insert extra data, even an "empty" database will have a little information in the system keyspace
- StorageService.instance.takeSnapshot("snapshot", new String[0]);
+ StorageService.instance.takeSnapshot("snapshot");
}
@Test
@@ -99,6 +99,50 @@ public class StorageServiceServerTest
}
@Test
+ public void testPrimaryRangeForEndpointWithinDCWithNetworkTopologyStrategy() throws Exception
+ {
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ metadata.clearUnsafe();
+
+ // DC1
+ metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
+ metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+
+ // DC2
+ metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
+ metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+
+ Map<String, String> configOptions = new HashMap<>();
+ configOptions.put("DC1", "1");
+ configOptions.put("DC2", "1");
+
+ Keyspace.clear("Keyspace1");
+ KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "NetworkTopologyStrategy", configOptions, false);
+ Schema.instance.setKeyspaceDefinition(meta);
+
+ Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name,
+ InetAddress.getByName("127.0.0.1"));
+ assertEquals(2, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2"));
+ assertEquals(2, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4"));
+ assertEquals(2, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5"));
+ assertEquals(2, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+ }
+
+ @Test
public void testPrimaryRangesWithNetworkTopologyStrategy() throws Exception
{
TokenMetadata metadata = StorageService.instance.getTokenMetadata();
@@ -110,7 +154,7 @@ public class StorageServiceServerTest
metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
- Map<String, String> configOptions = new HashMap<String, String>();
+ Map<String, String> configOptions = new HashMap<>();
configOptions.put("DC1", "1");
configOptions.put("DC2", "1");
@@ -147,7 +191,7 @@ public class StorageServiceServerTest
metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
- Map<String, String> configOptions = new HashMap<String, String>();
+ Map<String, String> configOptions = new HashMap<>();
configOptions.put("DC2", "2");
Keyspace.clear("Keyspace1");
@@ -174,6 +218,45 @@ public class StorageServiceServerTest
}
@Test
+ public void testPrimaryRangeForEndpointWithinDCWithNetworkTopologyStrategyOneDCOnly() throws Exception
+ {
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ metadata.clearUnsafe();
+ // DC1
+ metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
+ metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+ // DC2
+ metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
+ metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+
+ Map<String, String> configOptions = new HashMap<>();
+ configOptions.put("DC2", "2");
+
+ Keyspace.clear("Keyspace1");
+ KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "NetworkTopologyStrategy", configOptions, false);
+ Schema.instance.setKeyspaceDefinition(meta);
+
+ // endpoints in DC1 should not have primary range
+ Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1"));
+ assertTrue(primaryRanges.isEmpty());
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name,
+ InetAddress.getByName("127.0.0.2"));
+ assertTrue(primaryRanges.isEmpty());
+
+ // endpoints in DC2 should have primary ranges which also cover DC1
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4"));
+ assertTrue(primaryRanges.size() == 2);
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5"));
+ assertTrue(primaryRanges.size() == 2);
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+ }
+
+ @Test
public void testPrimaryRangesWithVnodes() throws Exception
{
TokenMetadata metadata = StorageService.instance.getTokenMetadata();
@@ -197,7 +280,7 @@ public class StorageServiceServerTest
dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("K"));
metadata.updateNormalTokens(dc2);
- Map<String, String> configOptions = new HashMap<String, String>();
+ Map<String, String> configOptions = new HashMap<>();
configOptions.put("DC2", "2");
Keyspace.clear("Keyspace1");
@@ -235,6 +318,86 @@ public class StorageServiceServerTest
assert primaryRanges.contains(new Range<Token>(new StringToken("H"), new StringToken("I")));
assert primaryRanges.contains(new Range<Token>(new StringToken("I"), new StringToken("J")));
}
+
+ @Test
+ public void testPrimaryRangeForEndpointWithinDCWithVnodes() throws Exception
+ {
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ metadata.clearUnsafe();
+
+ // DC1
+ Multimap<InetAddress, Token> dc1 = HashMultimap.create();
+ dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("A"));
+ dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("E"));
+ dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("H"));
+ dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("C"));
+ dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("I"));
+ dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("J"));
+ metadata.updateNormalTokens(dc1);
+
+ // DC2
+ Multimap<InetAddress, Token> dc2 = HashMultimap.create();
+ dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("B"));
+ dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("G"));
+ dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("L"));
+ dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("D"));
+ dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("F"));
+ dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("K"));
+ metadata.updateNormalTokens(dc2);
+
+ Map<String, String> configOptions = new HashMap<>();
+ configOptions.put("DC1", "1");
+ configOptions.put("DC2", "2");
+
+ Keyspace.clear("Keyspace1");
+ KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "NetworkTopologyStrategy", configOptions, false);
+ Schema.instance.setKeyspaceDefinition(meta);
+
+ // endpoints in DC1 should have primary ranges which also cover DC2
+ Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1"));
+ assertEquals(8, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("J"), new StringToken("K"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("K"), new StringToken("L"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("L"), new StringToken("A"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("E"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("E"), new StringToken("F"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("F"), new StringToken("G"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("G"), new StringToken("H"))));
+
+ // endpoints in DC1 should have primary ranges which also cover DC2
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2"));
+ assertEquals(4, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("H"), new StringToken("I"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("I"), new StringToken("J"))));
+
+ // endpoints in DC2 should have primary ranges which also cover DC1
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4"));
+ assertEquals(4, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("F"), new StringToken("G"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("K"), new StringToken("L"))));
+ // because /127.0.0.4 holds token "B" which is the next to token "A" from /127.0.0.1,
+ // the node covers range (L, A]
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("L"), new StringToken("A"))));
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5"));
+ assertTrue(primaryRanges.size() == 8);
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("E"), new StringToken("F"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("J"), new StringToken("K"))));
+ // ranges from /127.0.0.1
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("E"))));
+ // the next token to "H" in DC2 is "K" in /127.0.0.5, so (G, H] goes to /127.0.0.5
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("G"), new StringToken("H"))));
+ // ranges from /127.0.0.2
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("H"), new StringToken("I"))));
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("I"), new StringToken("J"))));
+ }
+
@Test
public void testPrimaryRangesWithSimpleStrategy() throws Exception
{
@@ -245,7 +408,7 @@ public class StorageServiceServerTest
metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.2"));
metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.3"));
- Map<String, String> configOptions = new HashMap<String, String>();
+ Map<String, String> configOptions = new HashMap<>();
configOptions.put("replication_factor", "2");
Keyspace.clear("Keyspace1");
@@ -264,4 +427,35 @@ public class StorageServiceServerTest
assert primaryRanges.size() == 1;
assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")));
}
+
+ /* Does not make much sense to use -local and -pr with simplestrategy, but just to prevent human errors */
+ @Test
+ public void testPrimaryRangeForEndpointWithinDCWithSimpleStrategy() throws Exception
+ {
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ metadata.clearUnsafe();
+
+ metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
+ metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.2"));
+ metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.3"));
+
+ Map<String, String> configOptions = new HashMap<>();
+ configOptions.put("replication_factor", "2");
+
+ Keyspace.clear("Keyspace1");
+ KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "SimpleStrategy", configOptions, false);
+ Schema.instance.setKeyspaceDefinition(meta);
+
+ Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1"));
+ assert primaryRanges.size() == 1;
+ assert primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("A")));
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2"));
+ assert primaryRanges.size() == 1;
+ assert primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B")));
+
+ primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.3"));
+ assert primaryRanges.size() == 1;
+ assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")));
+ }
}
[3/3] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/87ba0dc1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/87ba0dc1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/87ba0dc1
Branch: refs/heads/trunk
Commit: 87ba0dc1bce26ab97f5dd26edfd4a5c91ef9df48
Parents: 50c27cc 125b464
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Sep 17 09:48:14 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Sep 17 09:48:14 2014 -0500
----------------------------------------------------------------------
.../cassandra/service/StorageService.java | 92 +++++---
.../org/apache/cassandra/tools/NodeTool.java | 2 +-
.../service/StorageServiceServerTest.java | 210 ++++++++++++++++++-
3 files changed, 267 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/87ba0dc1/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/87ba0dc1/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------