You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2019/02/01 11:29:38 UTC

[cassandra] 01/01: Merge branch cassandra-3.0 into cassandra-3.11

This is an automated email from the ASF dual-hosted git repository.

snazy pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 16c96c20dadfbda98d4b5daf7f6c169b691459b9
Merge: 2d05bff bc18b4d
Author: Robert Stupp <sn...@snazy.de>
AuthorDate: Fri Feb 1 12:26:43 2019 +0100

    Merge branch cassandra-3.0 into cassandra-3.11

 .../org/apache/cassandra/db/ColumnFamilyStore.java |   1 -
 .../db/compaction/AbstractCompactionStrategy.java  |  14 +-
 .../compaction/DateTieredCompactionStrategy.java   |  20 +-
 .../compaction/SizeTieredCompactionStrategy.java   |   6 +-
 .../compaction/TimeWindowCompactionStrategy.java   |   4 +-
 .../apache/cassandra/db/marshal/TypeParser.java    |  30 +-
 .../cassandra/locator/NetworkTopologyStrategy.java |   3 +-
 .../apache/cassandra/locator/TokenMetadata.java    | 364 ++++++++++++---------
 .../locator/NetworkTopologyStrategyTest.java       |   3 +-
 .../cassandra/locator/TokenMetadataTest.java       |  23 +-
 10 files changed, 280 insertions(+), 188 deletions(-)

diff --cc src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index bb9f4b9,7c38fa8..83d77a9
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@@ -105,10 -89,10 +105,14 @@@ public class DateTieredCompactionStrate
       */
      private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
      {
-         if (sstables.isEmpty())
 -        if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
--            return Collections.emptyList();
++        Set<SSTableReader> uncompacting;
++        synchronized (sstables)
++        {
++            if (sstables.isEmpty())
++                return Collections.emptyList();
  
--        Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
++            uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
++        }
  
          Set<SSTableReader> expired = Collections.emptySet();
          // we only check for expired sstables every 10 minutes (by default) due to it being an expensive operation
diff --cc src/java/org/apache/cassandra/db/marshal/TypeParser.java
index 78af800,590eea3..7416d49
--- a/src/java/org/apache/cassandra/db/marshal/TypeParser.java
+++ b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
@@@ -23,7 -23,9 +23,10 @@@ import java.lang.reflect.Method
  import java.nio.ByteBuffer;
  import java.util.*;
  
+ import com.google.common.base.Verify;
+ import com.google.common.collect.ImmutableMap;
+ 
 +import org.apache.cassandra.cql3.FieldIdentifier;
  import org.apache.cassandra.exceptions.*;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
diff --cc src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 756b689,82183bb..d48dec3
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@@ -28,8 -28,8 +28,9 @@@ import org.apache.cassandra.exceptions.
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.locator.TokenMetadata.Topology;
  import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Pair;
  
+ import com.google.common.collect.ImmutableMultimap;
  import com.google.common.collect.Multimap;
  
  /**
@@@ -151,29 -92,22 +152,29 @@@ public class NetworkTopologyStrategy ex
          // all endpoints in each DC, so we can check when we have exhausted all the members of a DC
          Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
          // all racks in a DC so we can check when we have exhausted all racks in a DC
-         Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+         Map<String, ImmutableMultimap<String, InetAddress>> racks = topology.getDatacenterRacks();
          assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members";
  
 -        // tracks the racks we have already placed replicas in
 -        Map<String, Set<String>> seenRacks = new HashMap<>(datacenters.size());
 -        for (Map.Entry<String, Integer> dc : datacenters.entrySet())
 -            seenRacks.put(dc.getKey(), new HashSet<String>());
 +        int dcsToFill = 0;
 +        Map<String, DatacenterEndpoints> dcs = new HashMap<>(datacenters.size() * 2);
  
 -        // tracks the endpoints that we skipped over while looking for unique racks
 -        // when we relax the rack uniqueness we can append this to the current result so we don't have to wind back the iterator
 -        Map<String, Set<InetAddress>> skippedDcEndpoints = new HashMap<>(datacenters.size());
 -        for (Map.Entry<String, Integer> dc : datacenters.entrySet())
 -            skippedDcEndpoints.put(dc.getKey(), new LinkedHashSet<InetAddress>());
 +        // Create a DatacenterEndpoints object for each non-empty DC.
 +        for (Map.Entry<String, Integer> en : datacenters.entrySet())
 +        {
 +            String dc = en.getKey();
 +            int rf = en.getValue();
 +            int nodeCount = sizeOrZero(allEndpoints.get(dc));
 +
 +            if (rf <= 0 || nodeCount <= 0)
 +                continue;
 +
 +            DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, sizeOrZero(racks.get(dc)), nodeCount, replicas, seenRacks);
 +            dcs.put(dc, dcEndpoints);
 +            ++dcsToFill;
 +        }
  
          Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false);
 -        while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, allEndpoints))
 +        while (dcsToFill > 0 && tokenIter.hasNext())
          {
              Token next = tokenIter.next();
              InetAddress ep = tokenMetadata.getEndpoint(next);
diff --cc src/java/org/apache/cassandra/locator/TokenMetadata.java
index 8712916,3978eeb..8a1d9d0
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@@ -958,20 -975,33 +975,33 @@@ public class TokenMetadat
  
      public Token getPredecessor(Token token)
      {
 -        List tokens = sortedTokens();
 +        List<Token> tokens = sortedTokens();
          int index = Collections.binarySearch(tokens, token);
-         assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+         assert index >= 0 : token + " not found in " + tokenToEndpointMapKeysAsStrings();
 -        return (Token) (index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1));
 +        return index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1);
      }
  
      public Token getSuccessor(Token token)
      {
 -        List tokens = sortedTokens();
 +        List<Token> tokens = sortedTokens();
          int index = Collections.binarySearch(tokens, token);
-         assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+         assert index >= 0 : token + " not found in " + tokenToEndpointMapKeysAsStrings();
 -        return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1));
 +        return (index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1);
      }
  
+     private String tokenToEndpointMapKeysAsStrings()
+     {
+         lock.readLock().lock();
+         try
+         {
+             return StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+         }
+         finally
+         {
+             lock.readLock().unlock();
+         }
+     }
+ 
      /** @return a copy of the bootstrapping tokens map */
      public BiMultiValMap<Token, InetAddress> getBootstrapTokens()
      {
@@@ -1293,118 -1328,143 +1329,150 @@@
          }
  
          /**
-          * construct deep-copy of other
+          * @return map of DC to multi-map of rack to endpoints in that rack
           */
-         Topology(Topology other)
+         public ImmutableMap<String, ImmutableMultimap<String, InetAddress>> getDatacenterRacks()
          {
-             dcEndpoints = HashMultimap.create(other.dcEndpoints);
-             dcRacks = new HashMap<>();
-             for (String dc : other.dcRacks.keySet())
-                 dcRacks.put(dc, HashMultimap.create(other.dcRacks.get(dc)));
-             currentLocations = new HashMap<>(other.currentLocations);
+             return dcRacks;
          }
  
 +        /**
-          * Stores current DC/rack assignment for ep
++         * @return The DC and rack of the given endpoint.
 +         */
-         void addEndpoint(InetAddress ep)
++        public Pair<String, String> getLocation(InetAddress addr)
 +        {
-             IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-             String dc = snitch.getDatacenter(ep);
-             String rack = snitch.getRack(ep);
-             Pair<String, String> current = currentLocations.get(ep);
-             if (current != null)
-             {
-                 if (current.left.equals(dc) && current.right.equals(rack))
-                     return;
-                 doRemoveEndpoint(ep, current);
-             }
- 
-             doAddEndpoint(ep, dc, rack);
++            return currentLocations.get(addr);
 +        }
 +
-         private void doAddEndpoint(InetAddress ep, String dc, String rack)
+         Builder unbuild()
          {
-             dcEndpoints.put(dc, ep);
- 
-             if (!dcRacks.containsKey(dc))
-                 dcRacks.put(dc, HashMultimap.<String, InetAddress>create());
-             dcRacks.get(dc).put(rack, ep);
- 
-             currentLocations.put(ep, Pair.create(dc, rack));
+             return new Builder(this);
          }
  
-         /**
-          * Removes current DC/rack assignment for ep
-          */
-         void removeEndpoint(InetAddress ep)
+         static Builder builder()
          {
-             if (!currentLocations.containsKey(ep))
-                 return;
- 
-             doRemoveEndpoint(ep, currentLocations.remove(ep));
+             return new Builder();
          }
  
-         private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current)
+         static Topology empty()
          {
-             dcRacks.get(current.left).remove(current.right, ep);
-             dcEndpoints.remove(current.left, ep);
+             return builder().build();
          }
  
-         void updateEndpoint(InetAddress ep)
+         private static class Builder
          {
-             IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-             if (snitch == null || !currentLocations.containsKey(ep))
-                 return;
+             /** multi-map of DC to endpoints in that DC */
+             private final Multimap<String, InetAddress> dcEndpoints;
+             /** map of DC to multi-map of rack to endpoints in that rack */
+             private final Map<String, Multimap<String, InetAddress>> dcRacks;
+             /** reverse-lookup map for endpoint to current known dc/rack assignment */
+             private final Map<InetAddress, Pair<String, String>> currentLocations;
  
-            updateEndpoint(ep, snitch);
-         }
+             Builder()
+             {
+                 this.dcEndpoints = HashMultimap.create();
+                 this.dcRacks = new HashMap<>();
+                 this.currentLocations = new HashMap<>();
+             }
  
-         void updateEndpoints()
-         {
-             IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-             if (snitch == null)
-                 return;
+             Builder(Topology from)
+             {
+                 this.dcEndpoints = HashMultimap.create(from.dcEndpoints);
+ 
+                 this.dcRacks = Maps.newHashMapWithExpectedSize(from.dcRacks.size());
+                 for (Map.Entry<String, ImmutableMultimap<String, InetAddress>> entry : from.dcRacks.entrySet())
+                     dcRacks.put(entry.getKey(), HashMultimap.create(entry.getValue()));
+ 
+                 this.currentLocations = new HashMap<>(from.currentLocations);
+             }
+ 
+             /**
+              * Stores current DC/rack assignment for ep
+              */
+             Builder addEndpoint(InetAddress ep)
+             {
+                 IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+                 String dc = snitch.getDatacenter(ep);
+                 String rack = snitch.getRack(ep);
+                 Pair<String, String> current = currentLocations.get(ep);
+                 if (current != null)
+                 {
+                     if (current.left.equals(dc) && current.right.equals(rack))
+                         return this;
+                     doRemoveEndpoint(ep, current);
+                 }
+ 
+                 doAddEndpoint(ep, dc, rack);
+                 return this;
+             }
+ 
+             private void doAddEndpoint(InetAddress ep, String dc, String rack)
+             {
+                 dcEndpoints.put(dc, ep);
+ 
+                 if (!dcRacks.containsKey(dc))
+                     dcRacks.put(dc, HashMultimap.<String, InetAddress>create());
+                 dcRacks.get(dc).put(rack, ep);
+ 
+                 currentLocations.put(ep, Pair.create(dc, rack));
+             }
+ 
+             /**
+              * Removes current DC/rack assignment for ep
+              */
+             Builder removeEndpoint(InetAddress ep)
+             {
+                 if (!currentLocations.containsKey(ep))
+                     return this;
+ 
+                 doRemoveEndpoint(ep, currentLocations.remove(ep));
+                 return this;
+             }
+ 
+             private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current)
+             {
+                 dcRacks.get(current.left).remove(current.right, ep);
+                 dcEndpoints.remove(current.left, ep);
+             }
+ 
+             Builder updateEndpoint(InetAddress ep)
+             {
+                 IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+                 if (snitch == null || !currentLocations.containsKey(ep))
+                     return this;
  
-             for (InetAddress ep : currentLocations.keySet())
                  updateEndpoint(ep, snitch);
-         }
+                 return this;
+             }
  
-         private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch)
-         {
-             Pair<String, String> current = currentLocations.get(ep);
-             String dc = snitch.getDatacenter(ep);
-             String rack = snitch.getRack(ep);
-             if (dc.equals(current.left) && rack.equals(current.right))
-                 return;
+             Builder updateEndpoints()
+             {
+                 IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+                 if (snitch == null)
+                     return this;
  
-             doRemoveEndpoint(ep, current);
-             doAddEndpoint(ep, dc, rack);
-         }
+                 for (InetAddress ep : currentLocations.keySet())
+                     updateEndpoint(ep, snitch);
  
-         /**
-          * @return multi-map of DC to endpoints in that DC
-          */
-         public Multimap<String, InetAddress> getDatacenterEndpoints()
-         {
-             return dcEndpoints;
-         }
+                 return this;
+             }
  
-         /**
-          * @return map of DC to multi-map of rack to endpoints in that rack
-          */
-         public Map<String, Multimap<String, InetAddress>> getDatacenterRacks()
-         {
-             return dcRacks;
-         }
+             private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch)
+             {
+                 Pair<String, String> current = currentLocations.get(ep);
+                 String dc = snitch.getDatacenter(ep);
+                 String rack = snitch.getRack(ep);
+                 if (dc.equals(current.left) && rack.equals(current.right))
+                     return;
  
-         /**
-          * @return The DC and rack of the given endpoint.
-          */
-         public Pair<String, String> getLocation(InetAddress addr)
-         {
-             return currentLocations.get(addr);
-         }
+                 doRemoveEndpoint(ep, current);
+                 doAddEndpoint(ep, dc, rack);
+             }
  
+             Topology build()
+             {
+                 return new Topology(this);
+             }
+         }
 -
      }
  }
diff --cc test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
index f64b84a,bbfdd3b..48dd573
--- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
@@@ -21,17 -21,17 +21,18 @@@ package org.apache.cassandra.locator
  import java.io.IOException;
  import java.net.InetAddress;
  import java.net.UnknownHostException;
 -import java.util.ArrayList;
 -import java.util.HashMap;
 -import java.util.HashSet;
 -import java.util.List;
 -import java.util.Map;
 -import java.util.Set;
 +import java.util.*;
 +import java.util.stream.Collectors;
  
  import com.google.common.collect.HashMultimap;
 +import com.google.common.collect.ImmutableMap;
++import com.google.common.collect.ImmutableMultimap;
  import com.google.common.collect.Multimap;
 +
  import org.junit.Assert;
 +import org.junit.BeforeClass;
  import org.junit.Test;
 +
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -175,203 -166,4 +176,203 @@@ public class NetworkTopologyStrategyTes
          InetAddress add1 = InetAddress.getByAddress(bytes);
          metadata.updateNormalToken(token1, add1);
      }
 +
 +    @Test
 +    public void testCalculateEndpoints() throws UnknownHostException
 +    {
 +        final int NODES = 100;
 +        final int VNODES = 64;
 +        final int RUNS = 10;
 +        StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
 +        Map<String, Integer> datacenters = ImmutableMap.of("rf1", 1, "rf3", 3, "rf5_1", 5, "rf5_2", 5, "rf5_3", 5);
 +        List<InetAddress> nodes = new ArrayList<>(NODES);
 +        for (byte i=0; i<NODES; ++i)
 +            nodes.add(InetAddress.getByAddress(new byte[]{127, 0, 0, i}));
 +        for (int run=0; run<RUNS; ++run)
 +        {
 +            Random rand = new Random();
 +            IEndpointSnitch snitch = generateSnitch(datacenters, nodes, rand);
 +            DatabaseDescriptor.setEndpointSnitch(snitch);
 +
 +            TokenMetadata meta = new TokenMetadata();
 +            for (int i=0; i<NODES; ++i)  // Nodes
 +                for (int j=0; j<VNODES; ++j) // tokens/vnodes per node
 +                    meta.updateNormalToken(Murmur3Partitioner.instance.getRandomToken(rand), nodes.get(i));
 +            testEquivalence(meta, snitch, datacenters, rand);
 +        }
 +    }
 +
 +    void testEquivalence(TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, Integer> datacenters, Random rand)
 +    {
 +        NetworkTopologyStrategy nts = new NetworkTopologyStrategy("ks", tokenMetadata, snitch,
 +                                                                  datacenters.entrySet().stream().
 +                                                                      collect(Collectors.toMap(x -> x.getKey(), x -> Integer.toString(x.getValue()))));
 +        for (int i=0; i<1000; ++i)
 +        {
 +            Token token = Murmur3Partitioner.instance.getRandomToken(rand);
 +            List<InetAddress> expected = calculateNaturalEndpoints(token, tokenMetadata, datacenters, snitch);
 +            List<InetAddress> actual = nts.calculateNaturalEndpoints(token, tokenMetadata);
 +            if (endpointsDiffer(expected, actual))
 +            {
 +                System.err.println("Endpoints mismatch for token " + token);
 +                System.err.println(" expected: " + expected);
 +                System.err.println(" actual  : " + actual);
 +                Assert.assertEquals("Endpoints for token " + token + " mismatch.", expected, actual);
 +            }
 +        }
 +    }
 +
 +    private boolean endpointsDiffer(List<InetAddress> ep1, List<InetAddress> ep2)
 +    {
 +        // Because the old algorithm does not put the nodes in the correct order in the case where more replicas
 +        // are required than there are racks in a dc, we accept different order as long as the primary
 +        // replica is the same.
 +        if (ep1.equals(ep2))
 +            return false;
 +        if (!ep1.get(0).equals(ep2.get(0)))
 +            return true;
 +        Set<InetAddress> s1 = new HashSet<>(ep1);
 +        Set<InetAddress> s2 = new HashSet<>(ep2);
 +        return !s1.equals(s2);
 +    }
 +
 +    IEndpointSnitch generateSnitch(Map<String, Integer> datacenters, Collection<InetAddress> nodes, Random rand)
 +    {
 +        final Map<InetAddress, String> nodeToRack = new HashMap<>();
 +        final Map<InetAddress, String> nodeToDC = new HashMap<>();
 +        Map<String, List<String>> racksPerDC = new HashMap<>();
 +        datacenters.forEach((dc, rf) -> racksPerDC.put(dc, randomRacks(rf, rand)));
 +        int rf = datacenters.values().stream().mapToInt(x -> x).sum();
 +        String[] dcs = new String[rf];
 +        int pos = 0;
 +        for (Map.Entry<String, Integer> dce : datacenters.entrySet())
 +        {
 +            for (int i = 0; i < dce.getValue(); ++i)
 +                dcs[pos++] = dce.getKey();
 +        }
 +
 +        for (InetAddress node : nodes)
 +        {
 +            String dc = dcs[rand.nextInt(rf)];
 +            List<String> racks = racksPerDC.get(dc);
 +            String rack = racks.get(rand.nextInt(racks.size()));
 +            nodeToRack.put(node, rack);
 +            nodeToDC.put(node, dc);
 +        }
 +
 +        return new AbstractNetworkTopologySnitch()
 +        {
 +            public String getRack(InetAddress endpoint)
 +            {
 +                return nodeToRack.get(endpoint);
 +            }
 +
 +            public String getDatacenter(InetAddress endpoint)
 +            {
 +                return nodeToDC.get(endpoint);
 +            }
 +        };
 +    }
 +
 +    private List<String> randomRacks(int rf, Random rand)
 +    {
 +        int rc = rand.nextInt(rf * 3 - 1) + 1;
 +        List<String> racks = new ArrayList<>(rc);
 +        for (int i=0; i<rc; ++i)
 +            racks.add(Integer.toString(i));
 +        return racks;
 +    }
 +
 +    // Copy of older endpoints calculation algorithm for comparison
 +    public static List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, Map<String, Integer> datacenters, IEndpointSnitch snitch)
 +    {
 +        // we want to preserve insertion order so that the first added endpoint becomes primary
 +        Set<InetAddress> replicas = new LinkedHashSet<>();
 +        // replicas we have found in each DC
 +        Map<String, Set<InetAddress>> dcReplicas = new HashMap<>(datacenters.size());
 +        for (Map.Entry<String, Integer> dc : datacenters.entrySet())
 +            dcReplicas.put(dc.getKey(), new HashSet<InetAddress>(dc.getValue()));
 +
 +        Topology topology = tokenMetadata.getTopology();
 +        // all endpoints in each DC, so we can check when we have exhausted all the members of a DC
 +        Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
 +        // all racks in a DC so we can check when we have exhausted all racks in a DC
-         Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
++        Map<String, ImmutableMultimap<String, InetAddress>> racks = topology.getDatacenterRacks();
 +        assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members";
 +
 +        // tracks the racks we have already placed replicas in
 +        Map<String, Set<String>> seenRacks = new HashMap<>(datacenters.size());
 +        for (Map.Entry<String, Integer> dc : datacenters.entrySet())
 +            seenRacks.put(dc.getKey(), new HashSet<String>());
 +
 +        // tracks the endpoints that we skipped over while looking for unique racks
 +        // when we relax the rack uniqueness we can append this to the current result so we don't have to wind back the iterator
 +        Map<String, Set<InetAddress>> skippedDcEndpoints = new HashMap<>(datacenters.size());
 +        for (Map.Entry<String, Integer> dc : datacenters.entrySet())
 +            skippedDcEndpoints.put(dc.getKey(), new LinkedHashSet<InetAddress>());
 +
 +        Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false);
 +        while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, allEndpoints, datacenters))
 +        {
 +            Token next = tokenIter.next();
 +            InetAddress ep = tokenMetadata.getEndpoint(next);
 +            String dc = snitch.getDatacenter(ep);
 +            // have we already found all replicas for this dc?
 +            if (!datacenters.containsKey(dc) || hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters))
 +                continue;
 +            // can we skip checking the rack?
 +            if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
 +            {
 +                dcReplicas.get(dc).add(ep);
 +                replicas.add(ep);
 +            }
 +            else
 +            {
 +                String rack = snitch.getRack(ep);
 +                // is this a new rack?
 +                if (seenRacks.get(dc).contains(rack))
 +                {
 +                    skippedDcEndpoints.get(dc).add(ep);
 +                }
 +                else
 +                {
 +                    dcReplicas.get(dc).add(ep);
 +                    replicas.add(ep);
 +                    seenRacks.get(dc).add(rack);
 +                    // if we've run out of distinct racks, add the hosts we skipped past already (up to RF)
 +                    if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
 +                    {
 +                        Iterator<InetAddress> skippedIt = skippedDcEndpoints.get(dc).iterator();
 +                        while (skippedIt.hasNext() && !hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters))
 +                        {
 +                            InetAddress nextSkipped = skippedIt.next();
 +                            dcReplicas.get(dc).add(nextSkipped);
 +                            replicas.add(nextSkipped);
 +                        }
 +                    }
 +                }
 +            }
 +        }
 +
 +        return new ArrayList<InetAddress>(replicas);
 +    }
 +
 +    private static boolean hasSufficientReplicas(String dc, Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints, Map<String, Integer> datacenters)
 +    {
 +        return dcReplicas.get(dc).size() >= Math.min(allEndpoints.get(dc).size(), getReplicationFactor(dc, datacenters));
 +    }
 +
 +    private static boolean hasSufficientReplicas(Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints, Map<String, Integer> datacenters)
 +    {
 +        for (String dc : datacenters.keySet())
 +            if (!hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters))
 +                return false;
 +        return true;
 +    }
 +
 +    public static int getReplicationFactor(String dc, Map<String, Integer> datacenters)
 +    {
 +        Integer replicas = datacenters.get(dc);
 +        return replicas == null ? 0 : replicas;
 +    }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org