You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/09/17 16:48:47 UTC

[1/3] git commit: Make repair -pr work with -local

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 90a211455 -> 125b4642d
  refs/heads/trunk 50c27cca4 -> 87ba0dc1b


Make repair -pr work with -local

patch by Paulo Motta; reviewed by yukim for CASSANDRA-7450


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/125b4642
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/125b4642
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/125b4642

Branch: refs/heads/cassandra-2.1
Commit: 125b4642db63a37c560dec4615ea04662c6e45ef
Parents: 90a2114
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Sep 17 09:46:52 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Sep 17 09:46:52 2014 -0500

----------------------------------------------------------------------
 .../cassandra/service/StorageService.java       |  92 +++++---
 .../org/apache/cassandra/tools/NodeTool.java    |   2 +-
 .../service/StorageServiceServerTest.java       | 210 ++++++++++++++++++-
 3 files changed, 267 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/125b4642/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 1aa3b24..b1fe20e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -43,7 +43,6 @@ import ch.qos.logback.classic.jmx.JMXConfiguratorMBean;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.Appender;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.FutureCallback;
@@ -156,11 +155,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return getRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddress());
     }
 
-    public Collection<Range<Token>> getLocalPrimaryRanges(String keyspace)
+    public Collection<Range<Token>> getPrimaryRanges(String keyspace)
     {
         return getPrimaryRangesForEndpoint(keyspace, FBUtilities.getBroadcastAddress());
     }
 
+    public Collection<Range<Token>> getPrimaryRangesWithinDC(String keyspace)
+    {
+        return getPrimaryRangeForEndpointWithinDC(keyspace, FBUtilities.getBroadcastAddress());
+    }
+
     private final Set<InetAddress> replicatingNodes = Collections.synchronizedSet(new HashSet<InetAddress>());
     private CassandraDaemon daemon;
 
@@ -332,7 +336,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             throw new IllegalStateException("No configured daemon");
         }
-        
+
         try
         {
             daemon.nativeServer.start();
@@ -430,10 +434,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null)
                 throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
             Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS))));
-            
+
             SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
             Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
-            return tokens;        
+            return tokens;
         }
         catch (IOException e)
         {
@@ -2464,12 +2468,22 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
     {
-        // when repairing only primary range, dataCenter nor hosts can be set
-        if (primaryRange && (dataCenters != null || hosts != null))
+        Collection<Range<Token>> ranges;
+        if (primaryRange)
+        {
+            // when repairing only primary range, neither dataCenters nor hosts can be set
+            if (dataCenters == null && hosts == null)
+                ranges = getPrimaryRanges(keyspace);
+            // except dataCenters only contain local DC (i.e. -local)
+            else if (dataCenters != null && dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+                ranges = getPrimaryRangesWithinDC(keyspace);
+            else
+                throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+        }
+        else
         {
-            throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+             ranges = getLocalRanges(keyspace);
         }
-        Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
 
         return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, fullRepair, columnFamilies);
     }
@@ -2494,12 +2508,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)
     {
-        // when repairing only primary range, you cannot repair only on local DC
-        if (primaryRange && isLocal)
+        Collection<Range<Token>> ranges;
+        if (primaryRange)
+        {
+            ranges = isLocal ? getPrimaryRangesWithinDC(keyspace) : getPrimaryRanges(keyspace);
+        }
+        else
         {
-            throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+            ranges = getLocalRanges(keyspace);
         }
-        Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
+
         return forceRepairAsync(keyspace, isSequential, isLocal, ranges, fullRepair, columnFamilies);
     }
 
@@ -2723,7 +2741,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * The node that stores replica primarily is defined as the first node returned
      * by {@link AbstractReplicationStrategy#calculateNaturalEndpoints}.
      *
-     * @param keyspace
+     * @param keyspace Keyspace name to check primary ranges
      * @param ep endpoint we are interested in.
      * @return primary ranges for the specified endpoint.
      */
@@ -2742,20 +2760,38 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     }
 
     /**
-     * Previously, primary range is the range that the node is responsible for and calculated
-     * only from the token assigned to the node.
-     * But this does not take replication strategy into account, and therefore returns insufficient
-     * range especially using NTS with replication only to certain DC(see CASSANDRA-5424).
+     * Get the "primary ranges" within local DC for the specified keyspace and endpoint.
      *
-     * @deprecated
-     * @param ep endpoint we are interested in.
-     * @return range for the specified endpoint.
+     * @see #getPrimaryRangesForEndpoint(String, java.net.InetAddress)
+     * @param keyspace Keyspace name to check primary ranges
+     * @param referenceEndpoint endpoint we are interested in.
+     * @return primary ranges within local DC for the specified endpoint.
      */
-    @Deprecated
-    @VisibleForTesting
-    public Range<Token> getPrimaryRangeForEndpoint(InetAddress ep)
+    public Collection<Range<Token>> getPrimaryRangeForEndpointWithinDC(String keyspace, InetAddress referenceEndpoint)
     {
-        return tokenMetadata.getPrimaryRangeFor(tokenMetadata.getToken(ep));
+        TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap();
+        String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(referenceEndpoint);
+        Collection<InetAddress> localDcNodes = metadata.getTopology().getDatacenterEndpoints().get(localDC);
+        AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
+
+        Collection<Range<Token>> localDCPrimaryRanges = new HashSet<>();
+        for (Token token : metadata.sortedTokens())
+        {
+            List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(token, metadata);
+            for (InetAddress endpoint : endpoints)
+            {
+                if (localDcNodes.contains(endpoint))
+                {
+                    if (endpoint.equals(referenceEndpoint))
+                    {
+                        localDCPrimaryRanges.add(new Range<>(metadata.getPredecessor(token), token));
+                    }
+                    break;
+                }
+            }
+        }
+
+        return localDCPrimaryRanges;
     }
 
     /**
@@ -2861,7 +2897,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         // if both classQualifer and rawLevel are empty, reload from configuration
         if (StringUtils.isBlank(classQualifier) && StringUtils.isBlank(rawLevel) )
         {
-            JMXConfiguratorMBean jmxConfiguratorMBean = JMX.newMBeanProxy(ManagementFactory.getPlatformMBeanServer(), 
+            JMXConfiguratorMBean jmxConfiguratorMBean = JMX.newMBeanProxy(ManagementFactory.getPlatformMBeanServer(),
                     new ObjectName("ch.qos.logback.classic:Name=default,Type=ch.qos.logback.classic.jmx.JMXConfigurator"),
                     JMXConfiguratorMBean.class);
             jmxConfiguratorMBean.reloadDefaultConfiguration();
@@ -2879,7 +2915,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         logBackLogger.setLevel(level);
         logger.info("set log level to {} for classes under '{}' (if the level doesn't look like '{}' then the logger couldn't parse '{}')", level, classQualifier, rawLevel, rawLevel);
     }
-    
+
     /**
      * @return the runtime logging levels for all the configured loggers
      */
@@ -3228,7 +3264,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                                     {
                                         oldEndpoints.removeAll(newEndpoints);
 
-                                        //No relocation required 
+                                        //No relocation required
                                         if (oldEndpoints.isEmpty())
                                             continue;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/125b4642/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 91f9bf0..1d7b1ad 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -1674,7 +1674,7 @@ public class NodeTool
             List<String> keyspaces = parseOptionalKeyspace(args, probe);
             String[] cfnames = parseOptionalColumnFamilies(args);
 
-            if (primaryRange && (localDC || !specificHosts.isEmpty() || !specificHosts.isEmpty()))
+            if (primaryRange && (!specificDataCenters.isEmpty() || !specificHosts.isEmpty()))
                 throw new RuntimeException("Primary range repair should be performed on all nodes in the cluster.");
 
             for (String keyspace : keyspaces)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/125b4642/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
index 81c5261..84b7a3c 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
@@ -31,6 +31,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Keyspace;
@@ -39,8 +41,6 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.StringToken;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.PropertyFileSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
@@ -81,14 +81,14 @@ public class StorageServiceServerTest
     public void testGetAllRangesEmpty()
     {
         List<Token> toks = Collections.emptyList();
-        assertEquals(Collections.emptyList(), StorageService.instance.getAllRanges(toks));
+        assertEquals(Collections.<Range<Token>>emptyList(), StorageService.instance.getAllRanges(toks));
     }
 
     @Test
     public void testSnapshot() throws IOException
     {
         // no need to insert extra data, even an "empty" database will have a little information in the system keyspace
-        StorageService.instance.takeSnapshot("snapshot", new String[0]);
+        StorageService.instance.takeSnapshot("snapshot");
     }
 
     @Test
@@ -99,6 +99,50 @@ public class StorageServiceServerTest
     }
 
     @Test
+    public void testPrimaryRangeForEndpointWithinDCWithNetworkTopologyStrategy() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+
+        // DC1
+        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+
+        // DC2
+        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+
+        Map<String, String> configOptions = new HashMap<>();
+        configOptions.put("DC1", "1");
+        configOptions.put("DC2", "1");
+
+        Keyspace.clear("Keyspace1");
+        KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "NetworkTopologyStrategy", configOptions, false);
+        Schema.instance.setKeyspaceDefinition(meta);
+
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name,
+                                                                                                            InetAddress.getByName("127.0.0.1"));
+        assertEquals(2, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2"));
+        assertEquals(2, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4"));
+        assertEquals(2, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5"));
+        assertEquals(2, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+    }
+
+    @Test
     public void testPrimaryRangesWithNetworkTopologyStrategy() throws Exception
     {
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
@@ -110,7 +154,7 @@ public class StorageServiceServerTest
         metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
         metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
 
-        Map<String, String> configOptions = new HashMap<String, String>();
+        Map<String, String> configOptions = new HashMap<>();
         configOptions.put("DC1", "1");
         configOptions.put("DC2", "1");
 
@@ -147,7 +191,7 @@ public class StorageServiceServerTest
         metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
         metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
 
-        Map<String, String> configOptions = new HashMap<String, String>();
+        Map<String, String> configOptions = new HashMap<>();
         configOptions.put("DC2", "2");
 
         Keyspace.clear("Keyspace1");
@@ -174,6 +218,45 @@ public class StorageServiceServerTest
     }
 
     @Test
+    public void testPrimaryRangeForEndpointWithinDCWithNetworkTopologyStrategyOneDCOnly() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+        // DC1
+        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+        // DC2
+        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+
+        Map<String, String> configOptions = new HashMap<>();
+        configOptions.put("DC2", "2");
+
+        Keyspace.clear("Keyspace1");
+        KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "NetworkTopologyStrategy", configOptions, false);
+        Schema.instance.setKeyspaceDefinition(meta);
+
+        // endpoints in DC1 should not have primary range
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1"));
+        assertTrue(primaryRanges.isEmpty());
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name,
+                                                                                   InetAddress.getByName("127.0.0.2"));
+        assertTrue(primaryRanges.isEmpty());
+
+        // endpoints in DC2 should have primary ranges which also cover DC1
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4"));
+        assertTrue(primaryRanges.size() == 2);
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5"));
+        assertTrue(primaryRanges.size() == 2);
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+    }
+
+    @Test
     public void testPrimaryRangesWithVnodes() throws Exception
     {
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
@@ -197,7 +280,7 @@ public class StorageServiceServerTest
         dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("K"));
         metadata.updateNormalTokens(dc2);
 
-        Map<String, String> configOptions = new HashMap<String, String>();
+        Map<String, String> configOptions = new HashMap<>();
         configOptions.put("DC2", "2");
 
         Keyspace.clear("Keyspace1");
@@ -235,6 +318,86 @@ public class StorageServiceServerTest
         assert primaryRanges.contains(new Range<Token>(new StringToken("H"), new StringToken("I")));
         assert primaryRanges.contains(new Range<Token>(new StringToken("I"), new StringToken("J")));
     }
+
+    @Test
+    public void testPrimaryRangeForEndpointWithinDCWithVnodes() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+
+        // DC1
+        Multimap<InetAddress, Token> dc1 = HashMultimap.create();
+        dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("A"));
+        dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("E"));
+        dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("H"));
+        dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("C"));
+        dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("I"));
+        dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("J"));
+        metadata.updateNormalTokens(dc1);
+
+        // DC2
+        Multimap<InetAddress, Token> dc2 = HashMultimap.create();
+        dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("B"));
+        dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("G"));
+        dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("L"));
+        dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("D"));
+        dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("F"));
+        dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("K"));
+        metadata.updateNormalTokens(dc2);
+
+        Map<String, String> configOptions = new HashMap<>();
+        configOptions.put("DC1", "1");
+        configOptions.put("DC2", "2");
+
+        Keyspace.clear("Keyspace1");
+        KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "NetworkTopologyStrategy", configOptions, false);
+        Schema.instance.setKeyspaceDefinition(meta);
+
+        // endpoints in DC1 should have primary ranges which also cover DC2
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1"));
+        assertEquals(8, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("J"), new StringToken("K"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("K"), new StringToken("L"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("L"), new StringToken("A"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("E"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("E"), new StringToken("F"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("F"), new StringToken("G"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("G"), new StringToken("H"))));
+
+        // endpoints in DC1 should have primary ranges which also cover DC2
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2"));
+        assertEquals(4, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("H"), new StringToken("I"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("I"), new StringToken("J"))));
+
+        // endpoints in DC2 should have primary ranges which also cover DC1
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4"));
+        assertEquals(4, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("F"), new StringToken("G"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("K"), new StringToken("L"))));
+        // because /127.0.0.4 holds token "B" which is the next to token "A" from /127.0.0.1,
+        // the node covers range (L, A]
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("L"), new StringToken("A"))));
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5"));
+        assertTrue(primaryRanges.size() == 8);
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("E"), new StringToken("F"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("J"), new StringToken("K"))));
+        // ranges from /127.0.0.1
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("E"))));
+        // the next token to "H" in DC2 is "K" in /127.0.0.5, so (G, H] goes to /127.0.0.5
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("G"), new StringToken("H"))));
+        // ranges from /127.0.0.2
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("H"), new StringToken("I"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("I"), new StringToken("J"))));
+    }
+
     @Test
     public void testPrimaryRangesWithSimpleStrategy() throws Exception
     {
@@ -245,7 +408,7 @@ public class StorageServiceServerTest
         metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.2"));
         metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.3"));
 
-        Map<String, String> configOptions = new HashMap<String, String>();
+        Map<String, String> configOptions = new HashMap<>();
         configOptions.put("replication_factor", "2");
 
         Keyspace.clear("Keyspace1");
@@ -264,4 +427,35 @@ public class StorageServiceServerTest
         assert primaryRanges.size() == 1;
         assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")));
     }
+
+    /* Does not make much sense to use -local and -pr with simplestrategy, but just to prevent human errors */
+    @Test
+    public void testPrimaryRangeForEndpointWithinDCWithSimpleStrategy() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+
+        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.2"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.3"));
+
+        Map<String, String> configOptions = new HashMap<>();
+        configOptions.put("replication_factor", "2");
+
+        Keyspace.clear("Keyspace1");
+        KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "SimpleStrategy", configOptions, false);
+        Schema.instance.setKeyspaceDefinition(meta);
+
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1"));
+        assert primaryRanges.size() == 1;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("A")));
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2"));
+        assert primaryRanges.size() == 1;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B")));
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.3"));
+        assert primaryRanges.size() == 1;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")));
+    }
 }


[2/3] git commit: Make repair -pr work with -local

Posted by yu...@apache.org.
Make repair -pr work with -local

patch by Paulo Motta; reviewed by yukim for CASSANDRA-7450


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/125b4642
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/125b4642
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/125b4642

Branch: refs/heads/trunk
Commit: 125b4642db63a37c560dec4615ea04662c6e45ef
Parents: 90a2114
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Sep 17 09:46:52 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Sep 17 09:46:52 2014 -0500

----------------------------------------------------------------------
 .../cassandra/service/StorageService.java       |  92 +++++---
 .../org/apache/cassandra/tools/NodeTool.java    |   2 +-
 .../service/StorageServiceServerTest.java       | 210 ++++++++++++++++++-
 3 files changed, 267 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/125b4642/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 1aa3b24..b1fe20e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -43,7 +43,6 @@ import ch.qos.logback.classic.jmx.JMXConfiguratorMBean;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.Appender;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.FutureCallback;
@@ -156,11 +155,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return getRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddress());
     }
 
-    public Collection<Range<Token>> getLocalPrimaryRanges(String keyspace)
+    public Collection<Range<Token>> getPrimaryRanges(String keyspace)
     {
         return getPrimaryRangesForEndpoint(keyspace, FBUtilities.getBroadcastAddress());
     }
 
+    public Collection<Range<Token>> getPrimaryRangesWithinDC(String keyspace)
+    {
+        return getPrimaryRangeForEndpointWithinDC(keyspace, FBUtilities.getBroadcastAddress());
+    }
+
     private final Set<InetAddress> replicatingNodes = Collections.synchronizedSet(new HashSet<InetAddress>());
     private CassandraDaemon daemon;
 
@@ -332,7 +336,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             throw new IllegalStateException("No configured daemon");
         }
-        
+
         try
         {
             daemon.nativeServer.start();
@@ -430,10 +434,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null)
                 throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
             Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS))));
-            
+
             SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
             Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
-            return tokens;        
+            return tokens;
         }
         catch (IOException e)
         {
@@ -2464,12 +2468,22 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
     {
-        // when repairing only primary range, dataCenter nor hosts can be set
-        if (primaryRange && (dataCenters != null || hosts != null))
+        Collection<Range<Token>> ranges;
+        if (primaryRange)
+        {
+            // when repairing only primary range, neither dataCenters nor hosts can be set
+            if (dataCenters == null && hosts == null)
+                ranges = getPrimaryRanges(keyspace);
+            // except dataCenters only contain local DC (i.e. -local)
+            else if (dataCenters != null && dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+                ranges = getPrimaryRangesWithinDC(keyspace);
+            else
+                throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+        }
+        else
         {
-            throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+             ranges = getLocalRanges(keyspace);
         }
-        Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
 
         return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, fullRepair, columnFamilies);
     }
@@ -2494,12 +2508,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)
     {
-        // when repairing only primary range, you cannot repair only on local DC
-        if (primaryRange && isLocal)
+        Collection<Range<Token>> ranges;
+        if (primaryRange)
+        {
+            ranges = isLocal ? getPrimaryRangesWithinDC(keyspace) : getPrimaryRanges(keyspace);
+        }
+        else
         {
-            throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+            ranges = getLocalRanges(keyspace);
         }
-        Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
+
         return forceRepairAsync(keyspace, isSequential, isLocal, ranges, fullRepair, columnFamilies);
     }
 
@@ -2723,7 +2741,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * The node that stores replica primarily is defined as the first node returned
      * by {@link AbstractReplicationStrategy#calculateNaturalEndpoints}.
      *
-     * @param keyspace
+     * @param keyspace Keyspace name to check primary ranges
      * @param ep endpoint we are interested in.
      * @return primary ranges for the specified endpoint.
      */
@@ -2742,20 +2760,38 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     }
 
     /**
-     * Previously, primary range is the range that the node is responsible for and calculated
-     * only from the token assigned to the node.
-     * But this does not take replication strategy into account, and therefore returns insufficient
-     * range especially using NTS with replication only to certain DC(see CASSANDRA-5424).
+     * Get the "primary ranges" within local DC for the specified keyspace and endpoint.
      *
-     * @deprecated
-     * @param ep endpoint we are interested in.
-     * @return range for the specified endpoint.
+     * @see #getPrimaryRangesForEndpoint(String, java.net.InetAddress)
+     * @param keyspace Keyspace name to check primary ranges
+     * @param referenceEndpoint endpoint we are interested in.
+     * @return primary ranges within local DC for the specified endpoint.
      */
-    @Deprecated
-    @VisibleForTesting
-    public Range<Token> getPrimaryRangeForEndpoint(InetAddress ep)
+    public Collection<Range<Token>> getPrimaryRangeForEndpointWithinDC(String keyspace, InetAddress referenceEndpoint)
     {
-        return tokenMetadata.getPrimaryRangeFor(tokenMetadata.getToken(ep));
+        TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap();
+        String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(referenceEndpoint);
+        Collection<InetAddress> localDcNodes = metadata.getTopology().getDatacenterEndpoints().get(localDC);
+        AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
+
+        Collection<Range<Token>> localDCPrimaryRanges = new HashSet<>();
+        for (Token token : metadata.sortedTokens())
+        {
+            List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(token, metadata);
+            for (InetAddress endpoint : endpoints)
+            {
+                if (localDcNodes.contains(endpoint))
+                {
+                    if (endpoint.equals(referenceEndpoint))
+                    {
+                        localDCPrimaryRanges.add(new Range<>(metadata.getPredecessor(token), token));
+                    }
+                    break;
+                }
+            }
+        }
+
+        return localDCPrimaryRanges;
     }
 
     /**
@@ -2861,7 +2897,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         // if both classQualifer and rawLevel are empty, reload from configuration
         if (StringUtils.isBlank(classQualifier) && StringUtils.isBlank(rawLevel) )
         {
-            JMXConfiguratorMBean jmxConfiguratorMBean = JMX.newMBeanProxy(ManagementFactory.getPlatformMBeanServer(), 
+            JMXConfiguratorMBean jmxConfiguratorMBean = JMX.newMBeanProxy(ManagementFactory.getPlatformMBeanServer(),
                     new ObjectName("ch.qos.logback.classic:Name=default,Type=ch.qos.logback.classic.jmx.JMXConfigurator"),
                     JMXConfiguratorMBean.class);
             jmxConfiguratorMBean.reloadDefaultConfiguration();
@@ -2879,7 +2915,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         logBackLogger.setLevel(level);
         logger.info("set log level to {} for classes under '{}' (if the level doesn't look like '{}' then the logger couldn't parse '{}')", level, classQualifier, rawLevel, rawLevel);
     }
-    
+
     /**
      * @return the runtime logging levels for all the configured loggers
      */
@@ -3228,7 +3264,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                                     {
                                         oldEndpoints.removeAll(newEndpoints);
 
-                                        //No relocation required 
+                                        //No relocation required
                                         if (oldEndpoints.isEmpty())
                                             continue;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/125b4642/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 91f9bf0..1d7b1ad 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -1674,7 +1674,7 @@ public class NodeTool
             List<String> keyspaces = parseOptionalKeyspace(args, probe);
             String[] cfnames = parseOptionalColumnFamilies(args);
 
-            if (primaryRange && (localDC || !specificHosts.isEmpty() || !specificHosts.isEmpty()))
+            if (primaryRange && (!specificDataCenters.isEmpty() || !specificHosts.isEmpty()))
                 throw new RuntimeException("Primary range repair should be performed on all nodes in the cluster.");
 
             for (String keyspace : keyspaces)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/125b4642/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
index 81c5261..84b7a3c 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
@@ -31,6 +31,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Keyspace;
@@ -39,8 +41,6 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.StringToken;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.PropertyFileSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
@@ -81,14 +81,14 @@ public class StorageServiceServerTest
     public void testGetAllRangesEmpty()
     {
         List<Token> toks = Collections.emptyList();
-        assertEquals(Collections.emptyList(), StorageService.instance.getAllRanges(toks));
+        assertEquals(Collections.<Range<Token>>emptyList(), StorageService.instance.getAllRanges(toks));
     }
 
     @Test
     public void testSnapshot() throws IOException
     {
         // no need to insert extra data, even an "empty" database will have a little information in the system keyspace
-        StorageService.instance.takeSnapshot("snapshot", new String[0]);
+        StorageService.instance.takeSnapshot("snapshot");
     }
 
     @Test
@@ -99,6 +99,50 @@ public class StorageServiceServerTest
     }
 
     @Test
+    public void testPrimaryRangeForEndpointWithinDCWithNetworkTopologyStrategy() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+
+        // DC1
+        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+
+        // DC2
+        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+
+        Map<String, String> configOptions = new HashMap<>();
+        configOptions.put("DC1", "1");
+        configOptions.put("DC2", "1");
+
+        Keyspace.clear("Keyspace1");
+        KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "NetworkTopologyStrategy", configOptions, false);
+        Schema.instance.setKeyspaceDefinition(meta);
+
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name,
+                                                                                                            InetAddress.getByName("127.0.0.1"));
+        assertEquals(2, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2"));
+        assertEquals(2, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4"));
+        assertEquals(2, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5"));
+        assertEquals(2, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+    }
+
+    @Test
     public void testPrimaryRangesWithNetworkTopologyStrategy() throws Exception
     {
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
@@ -110,7 +154,7 @@ public class StorageServiceServerTest
         metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
         metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
 
-        Map<String, String> configOptions = new HashMap<String, String>();
+        Map<String, String> configOptions = new HashMap<>();
         configOptions.put("DC1", "1");
         configOptions.put("DC2", "1");
 
@@ -147,7 +191,7 @@ public class StorageServiceServerTest
         metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
         metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
 
-        Map<String, String> configOptions = new HashMap<String, String>();
+        Map<String, String> configOptions = new HashMap<>();
         configOptions.put("DC2", "2");
 
         Keyspace.clear("Keyspace1");
@@ -174,6 +218,45 @@ public class StorageServiceServerTest
     }
 
     @Test
+    public void testPrimaryRangeForEndpointWithinDCWithNetworkTopologyStrategyOneDCOnly() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+        // DC1
+        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+        // DC2
+        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+
+        Map<String, String> configOptions = new HashMap<>();
+        configOptions.put("DC2", "2");
+
+        Keyspace.clear("Keyspace1");
+        KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "NetworkTopologyStrategy", configOptions, false);
+        Schema.instance.setKeyspaceDefinition(meta);
+
+        // endpoints in DC1 should not have primary range
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1"));
+        assertTrue(primaryRanges.isEmpty());
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name,
+                                                                                   InetAddress.getByName("127.0.0.2"));
+        assertTrue(primaryRanges.isEmpty());
+
+        // endpoints in DC2 should have primary ranges which also cover DC1
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4"));
+        assertTrue(primaryRanges.size() == 2);
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5"));
+        assertTrue(primaryRanges.size() == 2);
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+    }
+
+    @Test
     public void testPrimaryRangesWithVnodes() throws Exception
     {
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
@@ -197,7 +280,7 @@ public class StorageServiceServerTest
         dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("K"));
         metadata.updateNormalTokens(dc2);
 
-        Map<String, String> configOptions = new HashMap<String, String>();
+        Map<String, String> configOptions = new HashMap<>();
         configOptions.put("DC2", "2");
 
         Keyspace.clear("Keyspace1");
@@ -235,6 +318,86 @@ public class StorageServiceServerTest
         assert primaryRanges.contains(new Range<Token>(new StringToken("H"), new StringToken("I")));
         assert primaryRanges.contains(new Range<Token>(new StringToken("I"), new StringToken("J")));
     }
+
+    @Test
+    public void testPrimaryRangeForEndpointWithinDCWithVnodes() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+
+        // DC1
+        Multimap<InetAddress, Token> dc1 = HashMultimap.create();
+        dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("A"));
+        dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("E"));
+        dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("H"));
+        dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("C"));
+        dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("I"));
+        dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("J"));
+        metadata.updateNormalTokens(dc1);
+
+        // DC2
+        Multimap<InetAddress, Token> dc2 = HashMultimap.create();
+        dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("B"));
+        dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("G"));
+        dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("L"));
+        dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("D"));
+        dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("F"));
+        dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("K"));
+        metadata.updateNormalTokens(dc2);
+
+        Map<String, String> configOptions = new HashMap<>();
+        configOptions.put("DC1", "1");
+        configOptions.put("DC2", "2");
+
+        Keyspace.clear("Keyspace1");
+        KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "NetworkTopologyStrategy", configOptions, false);
+        Schema.instance.setKeyspaceDefinition(meta);
+
+        // endpoints in DC1 should have primary ranges which also cover DC2
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1"));
+        assertEquals(8, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("J"), new StringToken("K"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("K"), new StringToken("L"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("L"), new StringToken("A"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("E"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("E"), new StringToken("F"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("F"), new StringToken("G"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("G"), new StringToken("H"))));
+
+        // endpoints in DC1 should have primary ranges which also cover DC2
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2"));
+        assertEquals(4, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("H"), new StringToken("I"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("I"), new StringToken("J"))));
+
+        // endpoints in DC2 should have primary ranges which also cover DC1
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4"));
+        assertEquals(4, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("F"), new StringToken("G"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("K"), new StringToken("L"))));
+        // because /127.0.0.4 holds token "B" which is the next to token "A" from /127.0.0.1,
+        // the node covers range (L, A]
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("L"), new StringToken("A"))));
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5"));
+        assertTrue(primaryRanges.size() == 8);
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("E"), new StringToken("F"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("J"), new StringToken("K"))));
+        // ranges from /127.0.0.1
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("E"))));
+        // the next token to "H" in DC2 is "K" in /127.0.0.5, so (G, H] goes to /127.0.0.5
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("G"), new StringToken("H"))));
+        // ranges from /127.0.0.2
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("H"), new StringToken("I"))));
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("I"), new StringToken("J"))));
+    }
+
     @Test
     public void testPrimaryRangesWithSimpleStrategy() throws Exception
     {
@@ -245,7 +408,7 @@ public class StorageServiceServerTest
         metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.2"));
         metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.3"));
 
-        Map<String, String> configOptions = new HashMap<String, String>();
+        Map<String, String> configOptions = new HashMap<>();
         configOptions.put("replication_factor", "2");
 
         Keyspace.clear("Keyspace1");
@@ -264,4 +427,35 @@ public class StorageServiceServerTest
         assert primaryRanges.size() == 1;
         assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")));
     }
+
+    /* Does not make much sense to use -local and -pr with simplestrategy, but just to prevent human errors */
+    @Test
+    public void testPrimaryRangeForEndpointWithinDCWithSimpleStrategy() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+
+        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.2"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.3"));
+
+        Map<String, String> configOptions = new HashMap<>();
+        configOptions.put("replication_factor", "2");
+
+        Keyspace.clear("Keyspace1");
+        KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", "SimpleStrategy", configOptions, false);
+        Schema.instance.setKeyspaceDefinition(meta);
+
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1"));
+        assert primaryRanges.size() == 1;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("A")));
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2"));
+        assert primaryRanges.size() == 1;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B")));
+
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.3"));
+        assert primaryRanges.size() == 1;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")));
+    }
 }


[3/3] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/87ba0dc1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/87ba0dc1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/87ba0dc1

Branch: refs/heads/trunk
Commit: 87ba0dc1bce26ab97f5dd26edfd4a5c91ef9df48
Parents: 50c27cc 125b464
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Sep 17 09:48:14 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Sep 17 09:48:14 2014 -0500

----------------------------------------------------------------------
 .../cassandra/service/StorageService.java       |  92 +++++---
 .../org/apache/cassandra/tools/NodeTool.java    |   2 +-
 .../service/StorageServiceServerTest.java       | 210 ++++++++++++++++++-
 3 files changed, 267 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/87ba0dc1/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87ba0dc1/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------