You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/07/03 19:01:35 UTC
[4/5] git commit: update NTS calculateNaturalEndpoints to be O(N log
N) patch by Sam Overton; reviewed by jbellis for CASSANDRA-3881
update NTS calculateNaturalEndpoints to be O(N log N)
patch by Sam Overton; reviewed by jbellis for CASSANDRA-3881
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9688a79d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9688a79d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9688a79d
Branch: refs/heads/trunk
Commit: 9688a79d0c315395772d15d92e051d00e18b966b
Parents: 893d1da
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jul 3 11:58:23 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jul 3 11:58:23 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/locator/NetworkTopologyStrategy.java | 123 ++++++++++-----
.../locator/NetworkTopologyStrategyTest.java | 62 +++++++-
3 files changed, 140 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9688a79d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7f36e9f..4ce9884 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2-dev
+ * update NTS calculateNaturalEndpoints to be O(N log N) (CASSANDRA-3881)
* add UseCondCardMark XX jvm settings on jdk 1.7 (CASSANDRA-4366)
* split up rpc timeout by operation type (CASSANDRA-2819)
* rewrite key cache save/load to use only sequential i/o (CASSANDRA-3762)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9688a79d/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 7b2ec91..30629d8 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -27,8 +27,10 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.TokenMetadata.Topology;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
+
+import com.google.common.collect.Multimap;
/**
* This Replication Strategy takes a property file that gives the intended
@@ -71,59 +73,96 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
logger.debug("Configured datacenter replicas are {}", FBUtilities.toString(datacenters));
}
+ /**
+ * calculate endpoints in one pass through the tokens by tracking our progress in each DC, rack etc.
+ */
+ @SuppressWarnings("serial")
public List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata)
{
- List<InetAddress> endpoints = new ArrayList<InetAddress>(getReplicationFactor());
-
- for (Entry<String, Integer> dcEntry : datacenters.entrySet())
+ Set<InetAddress> replicas = new HashSet<InetAddress>();
+ // replicas we have found in each DC
+ Map<String, Set<InetAddress>> dcReplicas = new HashMap<String, Set<InetAddress>>(datacenters.size())
+ {{
+ for (Map.Entry<String, Integer> dc : datacenters.entrySet())
+ put(dc.getKey(), new HashSet<InetAddress>(dc.getValue()));
+ }};
+ Topology topology = tokenMetadata.getTopology();
+ // all endpoints in each DC, so we can check when we have exhausted all the members of a DC
+ Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+ // all racks in a DC so we can check when we have exhausted all racks in a DC
+ Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+ assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members";
+
+ // tracks the racks we have already placed replicas in
+ Map<String, Set<String>> seenRacks = new HashMap<String, Set<String>>(datacenters.size())
+ {{
+ for (Map.Entry<String, Integer> dc : datacenters.entrySet())
+ put(dc.getKey(), new HashSet<String>());
+ }};
+ // tracks the endpoints that we skipped over while looking for unique racks
+ // when we relax the rack uniqueness we can append this to the current result so we don't have to wind back the iterator
+ Map<String, Set<InetAddress>> skippedDcEndpoints = new HashMap<String, Set<InetAddress>>(datacenters.size())
+ {{
+ for (Map.Entry<String, Integer> dc : datacenters.entrySet())
+ put(dc.getKey(), new LinkedHashSet<InetAddress>());
+ }};
+ Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false);
+ while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, allEndpoints))
{
- String dcName = dcEntry.getKey();
- int dcReplicas = dcEntry.getValue();
-
- // collect endpoints in this DC; add in bulk to token meta data for computational complexity
- // reasons (CASSANDRA-3831).
- Set<Pair<Token, InetAddress>> dcTokensToUpdate = new HashSet<Pair<Token, InetAddress>>();
- for (Entry<Token, InetAddress> tokenEntry : tokenMetadata.getTokenToEndpointMapForReading().entrySet())
+ Token next = tokenIter.next();
+ InetAddress ep = tokenMetadata.getEndpoint(next);
+ String dc = snitch.getDatacenter(ep);
+ // have we already found all replicas for this dc?
+ if (!datacenters.containsKey(dc) || hasSufficientReplicas(dc, dcReplicas, allEndpoints))
+ continue;
+ // can we skip checking the rack?
+ if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
{
- if (snitch.getDatacenter(tokenEntry.getValue()).equals(dcName))
- dcTokensToUpdate.add(Pair.create(tokenEntry.getKey(), tokenEntry.getValue()));
+ dcReplicas.get(dc).add(ep);
+ replicas.add(ep);
}
- TokenMetadata dcTokens = new TokenMetadata();
- dcTokens.updateNormalTokens(dcTokensToUpdate);
-
- List<InetAddress> dcEndpoints = new ArrayList<InetAddress>(dcReplicas);
- Set<String> racks = new HashSet<String>();
- // first pass: only collect replicas on unique racks
- for (Iterator<Token> iter = TokenMetadata.ringIterator(dcTokens.sortedTokens(), searchToken, false);
- dcEndpoints.size() < dcReplicas && iter.hasNext(); )
+ else
{
- Token token = iter.next();
- InetAddress endpoint = dcTokens.getEndpoint(token);
- String rack = snitch.getRack(endpoint);
- if (!racks.contains(rack))
+ String rack = snitch.getRack(ep);
+ // is this a new rack?
+ if (seenRacks.get(dc).contains(rack))
{
- dcEndpoints.add(endpoint);
- racks.add(rack);
+ skippedDcEndpoints.get(dc).add(ep);
+ }
+ else
+ {
+ dcReplicas.get(dc).add(ep);
+ replicas.add(ep);
+ seenRacks.get(dc).add(rack);
+ // if we've run out of distinct racks, add the hosts we skipped past already (up to RF)
+ if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
+ {
+ Iterator<InetAddress> skippedIt = skippedDcEndpoints.get(dc).iterator();
+ while (skippedIt.hasNext() && !hasSufficientReplicas(dc, dcReplicas, allEndpoints))
+ {
+ InetAddress nextSkipped = skippedIt.next();
+ dcReplicas.get(dc).add(nextSkipped);
+ replicas.add(nextSkipped);
+ }
+ }
}
}
+ }
- // second pass: if replica count has not been achieved from unique racks, add nodes from duplicate racks
- for (Iterator<Token> iter = TokenMetadata.ringIterator(dcTokens.sortedTokens(), searchToken, false);
- dcEndpoints.size() < dcReplicas && iter.hasNext(); )
- {
- Token token = iter.next();
- InetAddress endpoint = dcTokens.getEndpoint(token);
- if (!dcEndpoints.contains(endpoint))
- dcEndpoints.add(endpoint);
- }
+ return new ArrayList<InetAddress>(replicas);
+ }
- if (logger.isDebugEnabled())
- logger.debug("{} endpoints in datacenter {} for token {} ",
- new Object[] { StringUtils.join(dcEndpoints, ","), dcName, searchToken});
- endpoints.addAll(dcEndpoints);
- }
+ private boolean hasSufficientReplicas(String dc, Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints)
+ {
+ return dcReplicas.get(dc).size() >= Math.min(allEndpoints.get(dc).size(), getReplicationFactor(dc));
+ }
- return endpoints;
+ private boolean hasSufficientReplicas(Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints)
+ {
+ for (String dc : datacenters.keySet())
+ if (!hasSufficientReplicas(dc, dcReplicas, allEndpoints))
+ return false;
+ return true;
}
public int getReplicationFactor()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9688a79d/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
index 04cffbb..9e3d684 100644
--- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
@@ -25,24 +25,32 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
-import javax.xml.parsers.ParserConfigurationException;
+import java.util.Set;
+
+import junit.framework.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.StringToken;
import org.apache.cassandra.dht.Token;
-import org.xml.sax.SAXException;
+import org.apache.cassandra.utils.Pair;
public class NetworkTopologyStrategyTest
{
private String table = "Keyspace1";
+ private static final Logger logger = LoggerFactory.getLogger(NetworkTopologyStrategyTest.class);
@Test
- public void testProperties() throws IOException, ParserConfigurationException, SAXException, ConfigurationException
+ public void testProperties() throws IOException, ConfigurationException
{
IEndpointSnitch snitch = new PropertyFileSnitch();
+ DatabaseDescriptor.setEndpointSnitch(snitch);
TokenMetadata metadata = new TokenMetadata();
createDummyTokens(metadata, true);
@@ -63,9 +71,10 @@ public class NetworkTopologyStrategyTest
}
@Test
- public void testPropertiesWithEmptyDC() throws IOException, ParserConfigurationException, SAXException, ConfigurationException
+ public void testPropertiesWithEmptyDC() throws IOException, ConfigurationException
{
IEndpointSnitch snitch = new PropertyFileSnitch();
+ DatabaseDescriptor.setEndpointSnitch(snitch);
TokenMetadata metadata = new TokenMetadata();
createDummyTokens(metadata, false);
@@ -85,6 +94,51 @@ public class NetworkTopologyStrategyTest
assert 6 == new HashSet<InetAddress>(endpoints).size(); // ensure uniqueness
}
+ @Test
+ public void testLargeCluster() throws UnknownHostException, ConfigurationException
+ {
+ int[] dcRacks = new int[]{2, 4, 8};
+ int[] dcEndpoints = new int[]{128, 256, 512};
+ int[] dcReplication = new int[]{2, 6, 6};
+
+ IEndpointSnitch snitch = new RackInferringSnitch();
+ DatabaseDescriptor.setEndpointSnitch(snitch);
+ TokenMetadata metadata = new TokenMetadata();
+ Map<String, String> configOptions = new HashMap<String, String>();
+ Set<Pair<Token, InetAddress>> tokens = new HashSet<Pair<Token, InetAddress>>();
+
+ int totalRF = 0;
+ for (int dc = 0; dc < dcRacks.length; ++dc)
+ {
+ totalRF += dcReplication[dc];
+ configOptions.put(Integer.toString(dc), Integer.toString(dcReplication[dc]));
+ for (int rack = 0; rack < dcRacks[dc]; ++rack)
+ {
+ for (int ep = 1; ep <= dcEndpoints[dc]/dcRacks[dc]; ++ep)
+ {
+ byte[] ipBytes = new byte[]{10, (byte)dc, (byte)rack, (byte)ep};
+ InetAddress address = InetAddress.getByAddress(ipBytes);
+ StringToken token = new StringToken(String.format("%02x%02x%02x", ep, rack, dc));
+ logger.debug("adding node " + address + " at " + token);
+ tokens.add(new Pair<Token, InetAddress>(token, address));
+ }
+ }
+ }
+ metadata.updateNormalTokens(tokens);
+
+ NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(table, metadata, snitch, configOptions);
+
+ for (String testToken : new String[]{"123456", "200000", "000402", "ffffff", "400200"})
+ {
+ List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(new StringToken(testToken), metadata);
+ Set<InetAddress> epSet = new HashSet<InetAddress>(endpoints);
+
+ Assert.assertEquals(totalRF, endpoints.size());
+ Assert.assertEquals(totalRF, epSet.size());
+ logger.debug(testToken + ": " + endpoints.toString());
+ }
+ }
+
public void createDummyTokens(TokenMetadata metadata, boolean populateDC3) throws UnknownHostException
{
// DC 1