You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/01/07 11:37:18 UTC

[1/3] cassandra git commit: Add forgotten file for 9258 yet again

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.3 ffc0840d0 -> 5bf69abe1
  refs/heads/trunk 6ec5d55b9 -> 83aeeca30


Add forgotten file for 9258 yet again


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5bf69abe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5bf69abe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5bf69abe

Branch: refs/heads/cassandra-3.3
Commit: 5bf69abe137ffeb89abaa07590dd0e1a2eaa54e1
Parents: ffc0840
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 7 11:36:57 2016 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 7 11:36:57 2016 +0100

----------------------------------------------------------------------
 .../cassandra/locator/PendingRangeMaps.java     | 209 +++++++++++++++++++
 .../test/microbench/PendingRangesBench.java     |  89 ++++++++
 .../cassandra/locator/PendingRangeMapsTest.java |  78 +++++++
 3 files changed, 376 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bf69abe/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
new file mode 100644
index 0000000..1892cc3
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
@@ -0,0 +1,209 @@
+package org.apache.cassandra.locator;
+
+import com.google.common.collect.Iterators;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.*;
+
+public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<InetAddress>>>
+{
+    private static final Logger logger = LoggerFactory.getLogger(PendingRangeMaps.class);
+
+    /**
+     * We have for NavigableMap to be able to search for ranges containing a token efficiently.
+     *
+     * First two are for non-wrap-around ranges, and the last two are for wrap-around ranges.
+     */
+    // ascendingMap will sort the ranges by the ascending order of right token
+    final NavigableMap<Range<Token>, List<InetAddress>> ascendingMap;
+    /**
+     * sorting end ascending, if ends are same, sorting begin descending, so that token (end, end) will
+     * come before (begin, end] with the same end, and (begin, end) will be selected in the tailMap.
+     */
+    static final Comparator<Range<Token>> ascendingComparator = new Comparator<Range<Token>>()
+        {
+            @Override
+            public int compare(Range<Token> o1, Range<Token> o2)
+            {
+                int res = o1.right.compareTo(o2.right);
+                if (res != 0)
+                    return res;
+
+                return o2.left.compareTo(o1.left);
+            }
+        };
+
+    // ascendingMap will sort the ranges by the descending order of left token
+    final NavigableMap<Range<Token>, List<InetAddress>> descendingMap;
+    /**
+     * sorting begin descending, if begins are same, sorting end descending, so that token (begin, begin) will
+     * come after (begin, end] with the same begin, and (begin, end) won't be selected in the tailMap.
+     */
+    static final Comparator<Range<Token>> descendingComparator = new Comparator<Range<Token>>()
+        {
+            @Override
+            public int compare(Range<Token> o1, Range<Token> o2)
+            {
+                int res = o2.left.compareTo(o1.left);
+                if (res != 0)
+                    return res;
+
+                // if left tokens are same, sort by the descending of the right tokens.
+                return o2.right.compareTo(o1.right);
+            }
+        };
+
+    // these two maps are for warp around ranges.
+    final NavigableMap<Range<Token>, List<InetAddress>> ascendingMapForWrapAround;
+    /**
+     * for wrap around range (begin, end], which begin > end.
+     * Sorting end ascending, if ends are same, sorting begin ascending,
+     * so that token (end, end) will come before (begin, end] with the same end, and (begin, end] will be selected in
+     * the tailMap.
+     */
+    static final Comparator<Range<Token>> ascendingComparatorForWrapAround = new Comparator<Range<Token>>()
+    {
+        @Override
+        public int compare(Range<Token> o1, Range<Token> o2)
+        {
+            int res = o1.right.compareTo(o2.right);
+            if (res != 0)
+                return res;
+
+            return o1.left.compareTo(o2.left);
+        }
+    };
+
+    final NavigableMap<Range<Token>, List<InetAddress>> descendingMapForWrapAround;
+    /**
+     * for wrap around ranges, which begin > end.
+     * Sorting end ascending, so that token (begin, begin) will come after (begin, end] with the same begin,
+     * and (begin, end) won't be selected in the tailMap.
+     */
+    static final Comparator<Range<Token>> descendingComparatorForWrapAround = new Comparator<Range<Token>>()
+    {
+        @Override
+        public int compare(Range<Token> o1, Range<Token> o2)
+        {
+            int res = o2.left.compareTo(o1.left);
+            if (res != 0)
+                return res;
+            return o1.right.compareTo(o2.right);
+        }
+    };
+
+    public PendingRangeMaps()
+    {
+        this.ascendingMap = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparator);
+        this.descendingMap = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparator);
+        this.ascendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparatorForWrapAround);
+        this.descendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparatorForWrapAround);
+    }
+
+    static final void addToMap(Range<Token> range,
+                               InetAddress address,
+                               NavigableMap<Range<Token>, List<InetAddress>> ascendingMap,
+                               NavigableMap<Range<Token>, List<InetAddress>> descendingMap)
+    {
+        List<InetAddress> addresses = ascendingMap.get(range);
+        if (addresses == null)
+        {
+            addresses = new ArrayList<InetAddress>(1);
+            ascendingMap.put(range, addresses);
+            descendingMap.put(range, addresses);
+        }
+        addresses.add(address);
+    }
+
+    public void addPendingRange(Range<Token> range, InetAddress address)
+    {
+        if (Range.isWrapAround(range.left, range.right))
+        {
+            addToMap(range, address, ascendingMapForWrapAround, descendingMapForWrapAround);
+        }
+        else
+        {
+            addToMap(range, address, ascendingMap, descendingMap);
+        }
+    }
+
+    static final void addIntersections(Set<InetAddress> endpointsToAdd,
+                                       NavigableMap<Range<Token>, List<InetAddress>> smallerMap,
+                                       NavigableMap<Range<Token>, List<InetAddress>> biggerMap)
+    {
+        // find the intersection of two sets
+        for (Range<Token> range : smallerMap.keySet())
+        {
+            List<InetAddress> addresses = biggerMap.get(range);
+            if (addresses != null)
+            {
+                endpointsToAdd.addAll(addresses);
+            }
+        }
+    }
+
+    public Collection<InetAddress> pendingEndpointsFor(Token token)
+    {
+        Set<InetAddress> endpoints = new HashSet<>();
+
+        Range searchRange = new Range(token, token);
+
+        // search for non-wrap-around maps
+        NavigableMap<Range<Token>, List<InetAddress>> ascendingTailMap = ascendingMap.tailMap(searchRange, true);
+        NavigableMap<Range<Token>, List<InetAddress>> descendingTailMap = descendingMap.tailMap(searchRange, false);
+
+        // add intersections of two maps
+        if (ascendingTailMap.size() < descendingTailMap.size())
+        {
+            addIntersections(endpoints, ascendingTailMap, descendingTailMap);
+        }
+        else
+        {
+            addIntersections(endpoints, descendingTailMap, ascendingTailMap);
+        }
+
+        // search for wrap-around sets
+        ascendingTailMap = ascendingMapForWrapAround.tailMap(searchRange, true);
+        descendingTailMap = descendingMapForWrapAround.tailMap(searchRange, false);
+
+        // add them since they are all necessary.
+        for (Map.Entry<Range<Token>, List<InetAddress>> entry : ascendingTailMap.entrySet())
+        {
+            endpoints.addAll(entry.getValue());
+        }
+        for (Map.Entry<Range<Token>, List<InetAddress>> entry : descendingTailMap.entrySet())
+        {
+            endpoints.addAll(entry.getValue());
+        }
+
+        return endpoints;
+    }
+
+    public String printPendingRanges()
+    {
+        StringBuilder sb = new StringBuilder();
+
+        for (Map.Entry<Range<Token>, List<InetAddress>> entry : this)
+        {
+            Range<Token> range = entry.getKey();
+
+            for (InetAddress address : entry.getValue())
+            {
+                sb.append(address).append(':').append(range);
+                sb.append(System.getProperty("line.separator"));
+            }
+        }
+
+        return sb.toString();
+    }
+
+    @Override
+    public Iterator<Map.Entry<Range<Token>, List<InetAddress>>> iterator()
+    {
+        return Iterators.concat(ascendingMap.entrySet().iterator(), ascendingMapForWrapAround.entrySet().iterator());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bf69abe/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
new file mode 100644
index 0000000..e50cbaf
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
@@ -0,0 +1,89 @@
+package org.apache.cassandra.test.microbench;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.PendingRangeMaps;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 50, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 3,jvmArgsAppend = "-Xmx512M")
+@Threads(1)
+@State(Scope.Benchmark)
+public class PendingRangesBench
+{
+    PendingRangeMaps pendingRangeMaps;
+    int maxToken = 256 * 100;
+
+    Multimap<Range<Token>, InetAddress> oldPendingRanges;
+
+    private Range<Token> genRange(String left, String right)
+    {
+        return new Range<Token>(new RandomPartitioner.BigIntegerToken(left), new RandomPartitioner.BigIntegerToken(right));
+    }
+
+    @Setup
+    public void setUp() throws UnknownHostException
+    {
+        pendingRangeMaps = new PendingRangeMaps();
+        oldPendingRanges = HashMultimap.create();
+
+        InetAddress[] addresses = {InetAddress.getByName("127.0.0.1"), InetAddress.getByName("127.0.0.2")};
+
+        for (int i = 0; i < maxToken; i++)
+        {
+            for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++)
+            {
+                Range<Token> range = genRange(Integer.toString(i * 10 + 5), Integer.toString(i * 10 + 15));
+                pendingRangeMaps.addPendingRange(range, addresses[j]);
+                oldPendingRanges.put(range, addresses[j]);
+            }
+        }
+
+        // add the wrap around range
+        for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++)
+        {
+            Range<Token> range = genRange(Integer.toString(maxToken * 10 + 5), Integer.toString(5));
+            pendingRangeMaps.addPendingRange(range, addresses[j]);
+            oldPendingRanges.put(range, addresses[j]);
+        }
+    }
+
+    @Benchmark
+    public void searchToken(final Blackhole bh)
+    {
+        int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5);
+        Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken));
+        bh.consume(pendingRangeMaps.pendingEndpointsFor(searchToken));
+    }
+
+    @Benchmark
+    public void searchTokenForOldPendingRanges(final Blackhole bh)
+    {
+        int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5);
+        Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken));
+        Set<InetAddress> endpoints = new HashSet<>();
+        for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : oldPendingRanges.asMap().entrySet())
+        {
+            if (entry.getKey().contains(searchToken))
+                endpoints.addAll(entry.getValue());
+        }
+        bh.consume(endpoints);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bf69abe/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
new file mode 100644
index 0000000..6d24447
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
@@ -0,0 +1,78 @@
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PendingRangeMapsTest {
+
+    private Range<Token> genRange(String left, String right)
+    {
+        return new Range<Token>(new BigIntegerToken(left), new BigIntegerToken(right));
+    }
+
+    @Test
+    public void testPendingEndpoints() throws UnknownHostException
+    {
+        PendingRangeMaps pendingRangeMaps = new PendingRangeMaps();
+
+        pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddress.getByName("127.0.0.1"));
+        pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddress.getByName("127.0.0.2"));
+        pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddress.getByName("127.0.0.3"));
+        pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddress.getByName("127.0.0.4"));
+        pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddress.getByName("127.0.0.5"));
+        pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddress.getByName("127.0.0.6"));
+
+        assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size());
+        assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("10")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("20")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("25")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("35")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("45")).size());
+        assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size());
+
+        Collection<InetAddress> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15"));
+        assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.1")));
+    }
+
+    @Test
+    public void testWrapAroundRanges() throws UnknownHostException
+    {
+        PendingRangeMaps pendingRangeMaps = new PendingRangeMaps();
+
+        pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddress.getByName("127.0.0.1"));
+        pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddress.getByName("127.0.0.2"));
+        pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddress.getByName("127.0.0.3"));
+        pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddress.getByName("127.0.0.4"));
+        pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddress.getByName("127.0.0.5"));
+        pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddress.getByName("127.0.0.6"));
+        pendingRangeMaps.addPendingRange(genRange("65", "7"), InetAddress.getByName("127.0.0.7"));
+
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size());
+        assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("7")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("10")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("20")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("25")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("35")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("45")).size());
+        assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size());
+
+        Collection<InetAddress> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("6"));
+        assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.1")));
+        assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.7")));
+    }
+}


[2/3] cassandra git commit: Add forgotten file for 9258 yet again

Posted by sl...@apache.org.
Add forgotten file for 9258 yet again


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5bf69abe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5bf69abe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5bf69abe

Branch: refs/heads/trunk
Commit: 5bf69abe137ffeb89abaa07590dd0e1a2eaa54e1
Parents: ffc0840
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 7 11:36:57 2016 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 7 11:36:57 2016 +0100

----------------------------------------------------------------------
 .../cassandra/locator/PendingRangeMaps.java     | 209 +++++++++++++++++++
 .../test/microbench/PendingRangesBench.java     |  89 ++++++++
 .../cassandra/locator/PendingRangeMapsTest.java |  78 +++++++
 3 files changed, 376 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bf69abe/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
new file mode 100644
index 0000000..1892cc3
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
@@ -0,0 +1,209 @@
+package org.apache.cassandra.locator;
+
+import com.google.common.collect.Iterators;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.*;
+
+public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<InetAddress>>>
+{
+    private static final Logger logger = LoggerFactory.getLogger(PendingRangeMaps.class);
+
+    /**
+     * We have for NavigableMap to be able to search for ranges containing a token efficiently.
+     *
+     * First two are for non-wrap-around ranges, and the last two are for wrap-around ranges.
+     */
+    // ascendingMap will sort the ranges by the ascending order of right token
+    final NavigableMap<Range<Token>, List<InetAddress>> ascendingMap;
+    /**
+     * sorting end ascending, if ends are same, sorting begin descending, so that token (end, end) will
+     * come before (begin, end] with the same end, and (begin, end) will be selected in the tailMap.
+     */
+    static final Comparator<Range<Token>> ascendingComparator = new Comparator<Range<Token>>()
+        {
+            @Override
+            public int compare(Range<Token> o1, Range<Token> o2)
+            {
+                int res = o1.right.compareTo(o2.right);
+                if (res != 0)
+                    return res;
+
+                return o2.left.compareTo(o1.left);
+            }
+        };
+
+    // ascendingMap will sort the ranges by the descending order of left token
+    final NavigableMap<Range<Token>, List<InetAddress>> descendingMap;
+    /**
+     * sorting begin descending, if begins are same, sorting end descending, so that token (begin, begin) will
+     * come after (begin, end] with the same begin, and (begin, end) won't be selected in the tailMap.
+     */
+    static final Comparator<Range<Token>> descendingComparator = new Comparator<Range<Token>>()
+        {
+            @Override
+            public int compare(Range<Token> o1, Range<Token> o2)
+            {
+                int res = o2.left.compareTo(o1.left);
+                if (res != 0)
+                    return res;
+
+                // if left tokens are same, sort by the descending of the right tokens.
+                return o2.right.compareTo(o1.right);
+            }
+        };
+
+    // these two maps are for warp around ranges.
+    final NavigableMap<Range<Token>, List<InetAddress>> ascendingMapForWrapAround;
+    /**
+     * for wrap around range (begin, end], which begin > end.
+     * Sorting end ascending, if ends are same, sorting begin ascending,
+     * so that token (end, end) will come before (begin, end] with the same end, and (begin, end] will be selected in
+     * the tailMap.
+     */
+    static final Comparator<Range<Token>> ascendingComparatorForWrapAround = new Comparator<Range<Token>>()
+    {
+        @Override
+        public int compare(Range<Token> o1, Range<Token> o2)
+        {
+            int res = o1.right.compareTo(o2.right);
+            if (res != 0)
+                return res;
+
+            return o1.left.compareTo(o2.left);
+        }
+    };
+
+    final NavigableMap<Range<Token>, List<InetAddress>> descendingMapForWrapAround;
+    /**
+     * for wrap around ranges, which begin > end.
+     * Sorting end ascending, so that token (begin, begin) will come after (begin, end] with the same begin,
+     * and (begin, end) won't be selected in the tailMap.
+     */
+    static final Comparator<Range<Token>> descendingComparatorForWrapAround = new Comparator<Range<Token>>()
+    {
+        @Override
+        public int compare(Range<Token> o1, Range<Token> o2)
+        {
+            int res = o2.left.compareTo(o1.left);
+            if (res != 0)
+                return res;
+            return o1.right.compareTo(o2.right);
+        }
+    };
+
+    public PendingRangeMaps()
+    {
+        this.ascendingMap = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparator);
+        this.descendingMap = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparator);
+        this.ascendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparatorForWrapAround);
+        this.descendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparatorForWrapAround);
+    }
+
+    static final void addToMap(Range<Token> range,
+                               InetAddress address,
+                               NavigableMap<Range<Token>, List<InetAddress>> ascendingMap,
+                               NavigableMap<Range<Token>, List<InetAddress>> descendingMap)
+    {
+        List<InetAddress> addresses = ascendingMap.get(range);
+        if (addresses == null)
+        {
+            addresses = new ArrayList<InetAddress>(1);
+            ascendingMap.put(range, addresses);
+            descendingMap.put(range, addresses);
+        }
+        addresses.add(address);
+    }
+
+    public void addPendingRange(Range<Token> range, InetAddress address)
+    {
+        if (Range.isWrapAround(range.left, range.right))
+        {
+            addToMap(range, address, ascendingMapForWrapAround, descendingMapForWrapAround);
+        }
+        else
+        {
+            addToMap(range, address, ascendingMap, descendingMap);
+        }
+    }
+
+    static final void addIntersections(Set<InetAddress> endpointsToAdd,
+                                       NavigableMap<Range<Token>, List<InetAddress>> smallerMap,
+                                       NavigableMap<Range<Token>, List<InetAddress>> biggerMap)
+    {
+        // find the intersection of two sets
+        for (Range<Token> range : smallerMap.keySet())
+        {
+            List<InetAddress> addresses = biggerMap.get(range);
+            if (addresses != null)
+            {
+                endpointsToAdd.addAll(addresses);
+            }
+        }
+    }
+
+    public Collection<InetAddress> pendingEndpointsFor(Token token)
+    {
+        Set<InetAddress> endpoints = new HashSet<>();
+
+        Range searchRange = new Range(token, token);
+
+        // search for non-wrap-around maps
+        NavigableMap<Range<Token>, List<InetAddress>> ascendingTailMap = ascendingMap.tailMap(searchRange, true);
+        NavigableMap<Range<Token>, List<InetAddress>> descendingTailMap = descendingMap.tailMap(searchRange, false);
+
+        // add intersections of two maps
+        if (ascendingTailMap.size() < descendingTailMap.size())
+        {
+            addIntersections(endpoints, ascendingTailMap, descendingTailMap);
+        }
+        else
+        {
+            addIntersections(endpoints, descendingTailMap, ascendingTailMap);
+        }
+
+        // search for wrap-around sets
+        ascendingTailMap = ascendingMapForWrapAround.tailMap(searchRange, true);
+        descendingTailMap = descendingMapForWrapAround.tailMap(searchRange, false);
+
+        // add them since they are all necessary.
+        for (Map.Entry<Range<Token>, List<InetAddress>> entry : ascendingTailMap.entrySet())
+        {
+            endpoints.addAll(entry.getValue());
+        }
+        for (Map.Entry<Range<Token>, List<InetAddress>> entry : descendingTailMap.entrySet())
+        {
+            endpoints.addAll(entry.getValue());
+        }
+
+        return endpoints;
+    }
+
+    public String printPendingRanges()
+    {
+        StringBuilder sb = new StringBuilder();
+
+        for (Map.Entry<Range<Token>, List<InetAddress>> entry : this)
+        {
+            Range<Token> range = entry.getKey();
+
+            for (InetAddress address : entry.getValue())
+            {
+                sb.append(address).append(':').append(range);
+                sb.append(System.getProperty("line.separator"));
+            }
+        }
+
+        return sb.toString();
+    }
+
+    @Override
+    public Iterator<Map.Entry<Range<Token>, List<InetAddress>>> iterator()
+    {
+        return Iterators.concat(ascendingMap.entrySet().iterator(), ascendingMapForWrapAround.entrySet().iterator());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bf69abe/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
new file mode 100644
index 0000000..e50cbaf
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
@@ -0,0 +1,89 @@
+package org.apache.cassandra.test.microbench;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.PendingRangeMaps;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 50, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 3,jvmArgsAppend = "-Xmx512M")
+@Threads(1)
+@State(Scope.Benchmark)
+public class PendingRangesBench
+{
+    PendingRangeMaps pendingRangeMaps;
+    int maxToken = 256 * 100;
+
+    Multimap<Range<Token>, InetAddress> oldPendingRanges;
+
+    private Range<Token> genRange(String left, String right)
+    {
+        return new Range<Token>(new RandomPartitioner.BigIntegerToken(left), new RandomPartitioner.BigIntegerToken(right));
+    }
+
+    @Setup
+    public void setUp() throws UnknownHostException
+    {
+        pendingRangeMaps = new PendingRangeMaps();
+        oldPendingRanges = HashMultimap.create();
+
+        InetAddress[] addresses = {InetAddress.getByName("127.0.0.1"), InetAddress.getByName("127.0.0.2")};
+
+        for (int i = 0; i < maxToken; i++)
+        {
+            for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++)
+            {
+                Range<Token> range = genRange(Integer.toString(i * 10 + 5), Integer.toString(i * 10 + 15));
+                pendingRangeMaps.addPendingRange(range, addresses[j]);
+                oldPendingRanges.put(range, addresses[j]);
+            }
+        }
+
+        // add the wrap around range
+        for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++)
+        {
+            Range<Token> range = genRange(Integer.toString(maxToken * 10 + 5), Integer.toString(5));
+            pendingRangeMaps.addPendingRange(range, addresses[j]);
+            oldPendingRanges.put(range, addresses[j]);
+        }
+    }
+
+    @Benchmark
+    public void searchToken(final Blackhole bh)
+    {
+        int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5);
+        Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken));
+        bh.consume(pendingRangeMaps.pendingEndpointsFor(searchToken));
+    }
+
+    @Benchmark
+    public void searchTokenForOldPendingRanges(final Blackhole bh)
+    {
+        int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5);
+        Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken));
+        Set<InetAddress> endpoints = new HashSet<>();
+        for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : oldPendingRanges.asMap().entrySet())
+        {
+            if (entry.getKey().contains(searchToken))
+                endpoints.addAll(entry.getValue());
+        }
+        bh.consume(endpoints);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bf69abe/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
new file mode 100644
index 0000000..6d24447
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
@@ -0,0 +1,78 @@
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PendingRangeMapsTest {
+
+    private Range<Token> genRange(String left, String right)
+    {
+        return new Range<Token>(new BigIntegerToken(left), new BigIntegerToken(right));
+    }
+
+    @Test
+    public void testPendingEndpoints() throws UnknownHostException
+    {
+        PendingRangeMaps pendingRangeMaps = new PendingRangeMaps();
+
+        pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddress.getByName("127.0.0.1"));
+        pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddress.getByName("127.0.0.2"));
+        pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddress.getByName("127.0.0.3"));
+        pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddress.getByName("127.0.0.4"));
+        pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddress.getByName("127.0.0.5"));
+        pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddress.getByName("127.0.0.6"));
+
+        assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size());
+        assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("10")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("20")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("25")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("35")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("45")).size());
+        assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size());
+
+        Collection<InetAddress> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15"));
+        assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.1")));
+    }
+
+    @Test
+    public void testWrapAroundRanges() throws UnknownHostException
+    {
+        PendingRangeMaps pendingRangeMaps = new PendingRangeMaps();
+
+        pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddress.getByName("127.0.0.1"));
+        pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddress.getByName("127.0.0.2"));
+        pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddress.getByName("127.0.0.3"));
+        pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddress.getByName("127.0.0.4"));
+        pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddress.getByName("127.0.0.5"));
+        pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddress.getByName("127.0.0.6"));
+        pendingRangeMaps.addPendingRange(genRange("65", "7"), InetAddress.getByName("127.0.0.7"));
+
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size());
+        assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("7")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("10")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("20")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("25")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("35")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("45")).size());
+        assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size());
+        assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size());
+
+        Collection<InetAddress> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("6"));
+        assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.1")));
+        assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.7")));
+    }
+}


[3/3] cassandra git commit: Merge branch 'cassandra-3.3' into trunk

Posted by sl...@apache.org.
Merge branch 'cassandra-3.3' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/83aeeca3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/83aeeca3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/83aeeca3

Branch: refs/heads/trunk
Commit: 83aeeca3053b15272713388601f9d7907286a0d4
Parents: 6ec5d55 5bf69ab
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 7 11:37:03 2016 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jan 7 11:37:03 2016 +0100

----------------------------------------------------------------------
 .../cassandra/locator/PendingRangeMaps.java     | 209 +++++++++++++++++++
 .../test/microbench/PendingRangesBench.java     |  89 ++++++++
 .../cassandra/locator/PendingRangeMapsTest.java |  78 +++++++
 3 files changed, 376 insertions(+)
----------------------------------------------------------------------