You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/01/05 15:29:07 UTC
[13/20] cassandra git commit: Add mistakenly forgotten files for
CASSANDRA-9258
Add mistakenly forgotten files for CASSANDRA-9258
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e0c1b0bb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e0c1b0bb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e0c1b0bb
Branch: refs/heads/cassandra-3.3
Commit: e0c1b0bb7121df1cc0185ffc0b35547f75daa281
Parents: 6ff1cbb
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Jan 5 15:26:54 2016 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jan 5 15:26:54 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/locator/PendingRangeMaps.java | 209 +++++++++++++++++++
.../test/microbench/PendingRangesBench.java | 89 ++++++++
.../cassandra/locator/PendingRangeMapsTest.java | 78 +++++++
4 files changed, 377 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0c1b0bb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3c919c7..648200b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.5
+ * Optimize pending range computation (CASSANDRA-9258)
* Skip commit log and saved cache directories in SSTable version startup check (CASSANDRA-10902)
* drop/alter user should be case sensitive (CASSANDRA-10817)
* jemalloc detection fails due to quoting issues in regexv (CASSANDRA-10946)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0c1b0bb/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
new file mode 100644
index 0000000..1892cc3
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
@@ -0,0 +1,209 @@
+package org.apache.cassandra.locator;
+
+import com.google.common.collect.Iterators;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.*;
+
+public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<InetAddress>>>
+{
+ private static final Logger logger = LoggerFactory.getLogger(PendingRangeMaps.class);
+
+ /**
+ * We have for NavigableMap to be able to search for ranges containing a token efficiently.
+ *
+ * First two are for non-wrap-around ranges, and the last two are for wrap-around ranges.
+ */
+ // ascendingMap will sort the ranges by the ascending order of right token
+ final NavigableMap<Range<Token>, List<InetAddress>> ascendingMap;
+ /**
+ * sorting end ascending, if ends are same, sorting begin descending, so that token (end, end) will
+ * come before (begin, end] with the same end, and (begin, end) will be selected in the tailMap.
+ */
+ static final Comparator<Range<Token>> ascendingComparator = new Comparator<Range<Token>>()
+ {
+ @Override
+ public int compare(Range<Token> o1, Range<Token> o2)
+ {
+ int res = o1.right.compareTo(o2.right);
+ if (res != 0)
+ return res;
+
+ return o2.left.compareTo(o1.left);
+ }
+ };
+
+ // ascendingMap will sort the ranges by the descending order of left token
+ final NavigableMap<Range<Token>, List<InetAddress>> descendingMap;
+ /**
+ * sorting begin descending, if begins are same, sorting end descending, so that token (begin, begin) will
+ * come after (begin, end] with the same begin, and (begin, end) won't be selected in the tailMap.
+ */
+ static final Comparator<Range<Token>> descendingComparator = new Comparator<Range<Token>>()
+ {
+ @Override
+ public int compare(Range<Token> o1, Range<Token> o2)
+ {
+ int res = o2.left.compareTo(o1.left);
+ if (res != 0)
+ return res;
+
+ // if left tokens are same, sort by the descending of the right tokens.
+ return o2.right.compareTo(o1.right);
+ }
+ };
+
+ // these two maps are for warp around ranges.
+ final NavigableMap<Range<Token>, List<InetAddress>> ascendingMapForWrapAround;
+ /**
+ * for wrap around range (begin, end], which begin > end.
+ * Sorting end ascending, if ends are same, sorting begin ascending,
+ * so that token (end, end) will come before (begin, end] with the same end, and (begin, end] will be selected in
+ * the tailMap.
+ */
+ static final Comparator<Range<Token>> ascendingComparatorForWrapAround = new Comparator<Range<Token>>()
+ {
+ @Override
+ public int compare(Range<Token> o1, Range<Token> o2)
+ {
+ int res = o1.right.compareTo(o2.right);
+ if (res != 0)
+ return res;
+
+ return o1.left.compareTo(o2.left);
+ }
+ };
+
+ final NavigableMap<Range<Token>, List<InetAddress>> descendingMapForWrapAround;
+ /**
+ * for wrap around ranges, which begin > end.
+ * Sorting end ascending, so that token (begin, begin) will come after (begin, end] with the same begin,
+ * and (begin, end) won't be selected in the tailMap.
+ */
+ static final Comparator<Range<Token>> descendingComparatorForWrapAround = new Comparator<Range<Token>>()
+ {
+ @Override
+ public int compare(Range<Token> o1, Range<Token> o2)
+ {
+ int res = o2.left.compareTo(o1.left);
+ if (res != 0)
+ return res;
+ return o1.right.compareTo(o2.right);
+ }
+ };
+
+ public PendingRangeMaps()
+ {
+ this.ascendingMap = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparator);
+ this.descendingMap = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparator);
+ this.ascendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparatorForWrapAround);
+ this.descendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparatorForWrapAround);
+ }
+
+ static final void addToMap(Range<Token> range,
+ InetAddress address,
+ NavigableMap<Range<Token>, List<InetAddress>> ascendingMap,
+ NavigableMap<Range<Token>, List<InetAddress>> descendingMap)
+ {
+ List<InetAddress> addresses = ascendingMap.get(range);
+ if (addresses == null)
+ {
+ addresses = new ArrayList<InetAddress>(1);
+ ascendingMap.put(range, addresses);
+ descendingMap.put(range, addresses);
+ }
+ addresses.add(address);
+ }
+
+ public void addPendingRange(Range<Token> range, InetAddress address)
+ {
+ if (Range.isWrapAround(range.left, range.right))
+ {
+ addToMap(range, address, ascendingMapForWrapAround, descendingMapForWrapAround);
+ }
+ else
+ {
+ addToMap(range, address, ascendingMap, descendingMap);
+ }
+ }
+
+ static final void addIntersections(Set<InetAddress> endpointsToAdd,
+ NavigableMap<Range<Token>, List<InetAddress>> smallerMap,
+ NavigableMap<Range<Token>, List<InetAddress>> biggerMap)
+ {
+ // find the intersection of two sets
+ for (Range<Token> range : smallerMap.keySet())
+ {
+ List<InetAddress> addresses = biggerMap.get(range);
+ if (addresses != null)
+ {
+ endpointsToAdd.addAll(addresses);
+ }
+ }
+ }
+
+ public Collection<InetAddress> pendingEndpointsFor(Token token)
+ {
+ Set<InetAddress> endpoints = new HashSet<>();
+
+ Range searchRange = new Range(token, token);
+
+ // search for non-wrap-around maps
+ NavigableMap<Range<Token>, List<InetAddress>> ascendingTailMap = ascendingMap.tailMap(searchRange, true);
+ NavigableMap<Range<Token>, List<InetAddress>> descendingTailMap = descendingMap.tailMap(searchRange, false);
+
+ // add intersections of two maps
+ if (ascendingTailMap.size() < descendingTailMap.size())
+ {
+ addIntersections(endpoints, ascendingTailMap, descendingTailMap);
+ }
+ else
+ {
+ addIntersections(endpoints, descendingTailMap, ascendingTailMap);
+ }
+
+ // search for wrap-around sets
+ ascendingTailMap = ascendingMapForWrapAround.tailMap(searchRange, true);
+ descendingTailMap = descendingMapForWrapAround.tailMap(searchRange, false);
+
+ // add them since they are all necessary.
+ for (Map.Entry<Range<Token>, List<InetAddress>> entry : ascendingTailMap.entrySet())
+ {
+ endpoints.addAll(entry.getValue());
+ }
+ for (Map.Entry<Range<Token>, List<InetAddress>> entry : descendingTailMap.entrySet())
+ {
+ endpoints.addAll(entry.getValue());
+ }
+
+ return endpoints;
+ }
+
+ public String printPendingRanges()
+ {
+ StringBuilder sb = new StringBuilder();
+
+ for (Map.Entry<Range<Token>, List<InetAddress>> entry : this)
+ {
+ Range<Token> range = entry.getKey();
+
+ for (InetAddress address : entry.getValue())
+ {
+ sb.append(address).append(':').append(range);
+ sb.append(System.getProperty("line.separator"));
+ }
+ }
+
+ return sb.toString();
+ }
+
+ @Override
+ public Iterator<Map.Entry<Range<Token>, List<InetAddress>>> iterator()
+ {
+ return Iterators.concat(ascendingMap.entrySet().iterator(), ascendingMapForWrapAround.entrySet().iterator());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0c1b0bb/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
new file mode 100644
index 0000000..e50cbaf
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
@@ -0,0 +1,89 @@
+package org.apache.cassandra.test.microbench;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.PendingRangeMaps;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 50, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 3,jvmArgsAppend = "-Xmx512M")
+@Threads(1)
+@State(Scope.Benchmark)
+public class PendingRangesBench
+{
+ PendingRangeMaps pendingRangeMaps;
+ int maxToken = 256 * 100;
+
+ Multimap<Range<Token>, InetAddress> oldPendingRanges;
+
+ private Range<Token> genRange(String left, String right)
+ {
+ return new Range<Token>(new RandomPartitioner.BigIntegerToken(left), new RandomPartitioner.BigIntegerToken(right));
+ }
+
+ @Setup
+ public void setUp() throws UnknownHostException
+ {
+ pendingRangeMaps = new PendingRangeMaps();
+ oldPendingRanges = HashMultimap.create();
+
+ InetAddress[] addresses = {InetAddress.getByName("127.0.0.1"), InetAddress.getByName("127.0.0.2")};
+
+ for (int i = 0; i < maxToken; i++)
+ {
+ for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++)
+ {
+ Range<Token> range = genRange(Integer.toString(i * 10 + 5), Integer.toString(i * 10 + 15));
+ pendingRangeMaps.addPendingRange(range, addresses[j]);
+ oldPendingRanges.put(range, addresses[j]);
+ }
+ }
+
+ // add the wrap around range
+ for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++)
+ {
+ Range<Token> range = genRange(Integer.toString(maxToken * 10 + 5), Integer.toString(5));
+ pendingRangeMaps.addPendingRange(range, addresses[j]);
+ oldPendingRanges.put(range, addresses[j]);
+ }
+ }
+
+ @Benchmark
+ public void searchToken(final Blackhole bh)
+ {
+ int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5);
+ Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken));
+ bh.consume(pendingRangeMaps.pendingEndpointsFor(searchToken));
+ }
+
+ @Benchmark
+ public void searchTokenForOldPendingRanges(final Blackhole bh)
+ {
+ int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5);
+ Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken));
+ Set<InetAddress> endpoints = new HashSet<>();
+ for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : oldPendingRanges.asMap().entrySet())
+ {
+ if (entry.getKey().contains(searchToken))
+ endpoints.addAll(entry.getValue());
+ }
+ bh.consume(endpoints);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0c1b0bb/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
new file mode 100644
index 0000000..6d24447
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
@@ -0,0 +1,78 @@
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PendingRangeMapsTest {
+
+ private Range<Token> genRange(String left, String right)
+ {
+ return new Range<Token>(new BigIntegerToken(left), new BigIntegerToken(right));
+ }
+
+ @Test
+ public void testPendingEndpoints() throws UnknownHostException
+ {
+ PendingRangeMaps pendingRangeMaps = new PendingRangeMaps();
+
+ pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddress.getByName("127.0.0.1"));
+ pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddress.getByName("127.0.0.2"));
+ pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddress.getByName("127.0.0.3"));
+ pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddress.getByName("127.0.0.4"));
+ pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddress.getByName("127.0.0.5"));
+ pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddress.getByName("127.0.0.6"));
+
+ assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size());
+ assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("10")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("20")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("25")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("35")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("45")).size());
+ assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size());
+
+ Collection<InetAddress> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15"));
+ assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.1")));
+ }
+
+ @Test
+ public void testWrapAroundRanges() throws UnknownHostException
+ {
+ PendingRangeMaps pendingRangeMaps = new PendingRangeMaps();
+
+ pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddress.getByName("127.0.0.1"));
+ pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddress.getByName("127.0.0.2"));
+ pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddress.getByName("127.0.0.3"));
+ pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddress.getByName("127.0.0.4"));
+ pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddress.getByName("127.0.0.5"));
+ pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddress.getByName("127.0.0.6"));
+ pendingRangeMaps.addPendingRange(genRange("65", "7"), InetAddress.getByName("127.0.0.7"));
+
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size());
+ assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("7")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("10")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("20")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("25")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("35")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("45")).size());
+ assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size());
+
+ Collection<InetAddress> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("6"));
+ assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.1")));
+ assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.7")));
+ }
+}