You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/07/03 19:01:35 UTC

[4/5] git commit: update NTS calculateNaturalEndpoints to be O(N log N) patch by Sam Overton; reviewed by jbellis for CASSANDRA-3881

update NTS calculateNaturalEndpoints to be O(N log N)
patch by Sam Overton; reviewed by jbellis for CASSANDRA-3881


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9688a79d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9688a79d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9688a79d

Branch: refs/heads/trunk
Commit: 9688a79d0c315395772d15d92e051d00e18b966b
Parents: 893d1da
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jul 3 11:58:23 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jul 3 11:58:23 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/locator/NetworkTopologyStrategy.java |  123 ++++++++++-----
 .../locator/NetworkTopologyStrategyTest.java       |   62 +++++++-
 3 files changed, 140 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9688a79d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7f36e9f..4ce9884 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-dev
+ * update NTS calculateNaturalEndpoints to be O(N log N) (CASSANDRA-3881)
  * add UseCondCardMark XX jvm settings on jdk 1.7 (CASSANDRA-4366)
  * split up rpc timeout by operation type (CASSANDRA-2819)
  * rewrite key cache save/load to use only sequential i/o (CASSANDRA-3762)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9688a79d/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 7b2ec91..30629d8 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -27,8 +27,10 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.TokenMetadata.Topology;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
+
+import com.google.common.collect.Multimap;
 
 /**
  * This Replication Strategy takes a property file that gives the intended
@@ -71,59 +73,96 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
         logger.debug("Configured datacenter replicas are {}", FBUtilities.toString(datacenters));
     }
 
+    /**
+     * calculate endpoints in one pass through the tokens by tracking our progress in each DC, rack etc.
+     */
+    @SuppressWarnings("serial")
     public List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata)
     {
-        List<InetAddress> endpoints = new ArrayList<InetAddress>(getReplicationFactor());
-
-        for (Entry<String, Integer> dcEntry : datacenters.entrySet())
+        Set<InetAddress> replicas = new HashSet<InetAddress>();
+        // replicas we have found in each DC
+        Map<String, Set<InetAddress>> dcReplicas = new HashMap<String, Set<InetAddress>>(datacenters.size())
+        {{
+            for (Map.Entry<String, Integer> dc : datacenters.entrySet())
+                put(dc.getKey(), new HashSet<InetAddress>(dc.getValue()));
+        }};
+        Topology topology = tokenMetadata.getTopology();
+        // all endpoints in each DC, so we can check when we have exhausted all the members of a DC
+        Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+        // all racks in a DC so we can check when we have exhausted all racks in a DC
+        Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+        assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members";
+
+        // tracks the racks we have already placed replicas in
+        Map<String, Set<String>> seenRacks = new HashMap<String, Set<String>>(datacenters.size())
+        {{
+            for (Map.Entry<String, Integer> dc : datacenters.entrySet())
+                put(dc.getKey(), new HashSet<String>());
+        }};
+        // tracks the endpoints that we skipped over while looking for unique racks
+        // when we relax the rack uniqueness we can append this to the current result so we don't have to wind back the iterator
+        Map<String, Set<InetAddress>> skippedDcEndpoints = new HashMap<String, Set<InetAddress>>(datacenters.size())
+        {{
+            for (Map.Entry<String, Integer> dc : datacenters.entrySet())
+                put(dc.getKey(), new LinkedHashSet<InetAddress>());
+        }};
+        Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false);
+        while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, allEndpoints))
         {
-            String dcName = dcEntry.getKey();
-            int dcReplicas = dcEntry.getValue();
-
-            // collect endpoints in this DC; add in bulk to token meta data for computational complexity
-            // reasons (CASSANDRA-3831).
-            Set<Pair<Token, InetAddress>> dcTokensToUpdate = new HashSet<Pair<Token, InetAddress>>();
-            for (Entry<Token, InetAddress> tokenEntry : tokenMetadata.getTokenToEndpointMapForReading().entrySet())
+            Token next = tokenIter.next();
+            InetAddress ep = tokenMetadata.getEndpoint(next);
+            String dc = snitch.getDatacenter(ep);
+            // have we already found all replicas for this dc?
+            if (!datacenters.containsKey(dc) || hasSufficientReplicas(dc, dcReplicas, allEndpoints))
+                continue;
+            // can we skip checking the rack?
+            if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
             {
-                if (snitch.getDatacenter(tokenEntry.getValue()).equals(dcName))
-                    dcTokensToUpdate.add(Pair.create(tokenEntry.getKey(), tokenEntry.getValue()));
+                dcReplicas.get(dc).add(ep);
+                replicas.add(ep);
             }
-            TokenMetadata dcTokens = new TokenMetadata();
-            dcTokens.updateNormalTokens(dcTokensToUpdate);
-
-            List<InetAddress> dcEndpoints = new ArrayList<InetAddress>(dcReplicas);
-            Set<String> racks = new HashSet<String>();
-            // first pass: only collect replicas on unique racks
-            for (Iterator<Token> iter = TokenMetadata.ringIterator(dcTokens.sortedTokens(), searchToken, false);
-                 dcEndpoints.size() < dcReplicas && iter.hasNext(); )
+            else
             {
-                Token token = iter.next();
-                InetAddress endpoint = dcTokens.getEndpoint(token);
-                String rack = snitch.getRack(endpoint);
-                if (!racks.contains(rack))
+                String rack = snitch.getRack(ep);
+                // is this a new rack?
+                if (seenRacks.get(dc).contains(rack))
                 {
-                    dcEndpoints.add(endpoint);
-                    racks.add(rack);
+                    skippedDcEndpoints.get(dc).add(ep);
+                }
+                else
+                {
+                    dcReplicas.get(dc).add(ep);
+                    replicas.add(ep);
+                    seenRacks.get(dc).add(rack);
+                    // if we've run out of distinct racks, add the hosts we skipped past already (up to RF)
+                    if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
+                    {
+                        Iterator<InetAddress> skippedIt = skippedDcEndpoints.get(dc).iterator();
+                        while (skippedIt.hasNext() && !hasSufficientReplicas(dc, dcReplicas, allEndpoints))
+                        {
+                            InetAddress nextSkipped = skippedIt.next();
+                            dcReplicas.get(dc).add(nextSkipped);
+                            replicas.add(nextSkipped);
+                        }
+                    }
                 }
             }
+        }
 
-            // second pass: if replica count has not been achieved from unique racks, add nodes from duplicate racks
-            for (Iterator<Token> iter = TokenMetadata.ringIterator(dcTokens.sortedTokens(), searchToken, false);
-                 dcEndpoints.size() < dcReplicas && iter.hasNext(); )
-            {
-                Token token = iter.next();
-                InetAddress endpoint = dcTokens.getEndpoint(token);
-                if (!dcEndpoints.contains(endpoint))
-                    dcEndpoints.add(endpoint);
-            }
+        return new ArrayList<InetAddress>(replicas);
+    }
 
-            if (logger.isDebugEnabled())
-                logger.debug("{} endpoints in datacenter {} for token {} ",
-                             new Object[] { StringUtils.join(dcEndpoints, ","), dcName, searchToken});
-            endpoints.addAll(dcEndpoints);
-        }
+    private boolean hasSufficientReplicas(String dc, Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints)
+    {
+        return dcReplicas.get(dc).size() >= Math.min(allEndpoints.get(dc).size(), getReplicationFactor(dc));
+    }
 
-        return endpoints;
+    private boolean hasSufficientReplicas(Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints)
+    {
+        for (String dc : datacenters.keySet())
+            if (!hasSufficientReplicas(dc, dcReplicas, allEndpoints))
+                return false;
+        return true;
     }
 
     public int getReplicationFactor()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9688a79d/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
index 04cffbb..9e3d684 100644
--- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
@@ -25,24 +25,32 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-import javax.xml.parsers.ParserConfigurationException;
+import java.util.Set;
+
+import junit.framework.Assert;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.StringToken;
 import org.apache.cassandra.dht.Token;
-import org.xml.sax.SAXException;
+import org.apache.cassandra.utils.Pair;
 
 public class NetworkTopologyStrategyTest
 {
     private String table = "Keyspace1";
+    private static final Logger logger = LoggerFactory.getLogger(NetworkTopologyStrategyTest.class);
 
     @Test
-    public void testProperties() throws IOException, ParserConfigurationException, SAXException, ConfigurationException
+    public void testProperties() throws IOException, ConfigurationException
     {
         IEndpointSnitch snitch = new PropertyFileSnitch();
+        DatabaseDescriptor.setEndpointSnitch(snitch);
         TokenMetadata metadata = new TokenMetadata();
         createDummyTokens(metadata, true);
 
@@ -63,9 +71,10 @@ public class NetworkTopologyStrategyTest
     }
 
     @Test
-    public void testPropertiesWithEmptyDC() throws IOException, ParserConfigurationException, SAXException, ConfigurationException
+    public void testPropertiesWithEmptyDC() throws IOException, ConfigurationException
     {
         IEndpointSnitch snitch = new PropertyFileSnitch();
+        DatabaseDescriptor.setEndpointSnitch(snitch);
         TokenMetadata metadata = new TokenMetadata();
         createDummyTokens(metadata, false);
 
@@ -85,6 +94,51 @@ public class NetworkTopologyStrategyTest
         assert 6 == new HashSet<InetAddress>(endpoints).size(); // ensure uniqueness
     }
 
+    @Test
+    public void testLargeCluster() throws UnknownHostException, ConfigurationException
+    {
+        int[] dcRacks = new int[]{2, 4, 8};
+        int[] dcEndpoints = new int[]{128, 256, 512};
+        int[] dcReplication = new int[]{2, 6, 6};
+
+        IEndpointSnitch snitch = new RackInferringSnitch();
+        DatabaseDescriptor.setEndpointSnitch(snitch);
+        TokenMetadata metadata = new TokenMetadata();
+        Map<String, String> configOptions = new HashMap<String, String>();
+        Set<Pair<Token, InetAddress>> tokens = new HashSet<Pair<Token, InetAddress>>();
+
+        int totalRF = 0;
+        for (int dc = 0; dc < dcRacks.length; ++dc)
+        {
+            totalRF += dcReplication[dc];
+            configOptions.put(Integer.toString(dc), Integer.toString(dcReplication[dc]));
+            for (int rack = 0; rack < dcRacks[dc]; ++rack)
+            {
+                for (int ep = 1; ep <= dcEndpoints[dc]/dcRacks[dc]; ++ep)
+                {
+                    byte[] ipBytes = new byte[]{10, (byte)dc, (byte)rack, (byte)ep};
+                    InetAddress address = InetAddress.getByAddress(ipBytes);
+                    StringToken token = new StringToken(String.format("%02x%02x%02x", ep, rack, dc));
+                    logger.debug("adding node " + address + " at " + token);
+                    tokens.add(new Pair<Token, InetAddress>(token, address));
+                }
+            }
+        }
+        metadata.updateNormalTokens(tokens);
+
+        NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(table, metadata, snitch, configOptions);
+
+        for (String testToken : new String[]{"123456", "200000", "000402", "ffffff", "400200"})
+        {
+            List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(new StringToken(testToken), metadata);
+            Set<InetAddress> epSet = new HashSet<InetAddress>(endpoints);
+
+            Assert.assertEquals(totalRF, endpoints.size());
+            Assert.assertEquals(totalRF, epSet.size());
+            logger.debug(testToken + ": " + endpoints.toString());
+        }
+    }
+
     public void createDummyTokens(TokenMetadata metadata, boolean populateDC3) throws UnknownHostException
     {
         // DC 1