You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2021/10/29 11:29:43 UTC
[cassandra-accord] 02/02: KeyRange refactor
This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 210ff37b75fc07603b5f655171f6c49cf75805da
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Tue Sep 14 13:52:17 2021 -0700
KeyRange refactor
---
accord-core/src/main/java/accord/api/KeyRange.java | 163 +++++++++++++++++++++
accord-core/src/main/java/accord/api/Read.java | 2 +-
accord-core/src/main/java/accord/api/Write.java | 2 +-
.../src/main/java/accord/topology/KeyRanges.java | 48 ++++++
.../src/main/java/accord/topology/Shard.java | 13 +-
.../src/main/java/accord/topology/Shards.java | 16 +-
.../src/main/java/accord/topology/Topology.java | 118 +++++----------
accord-core/src/main/java/accord/txn/Keys.java | 62 ++------
accord-core/src/main/java/accord/txn/Txn.java | 14 +-
accord-core/src/main/java/accord/txn/Writes.java | 2 +-
.../src/main/java/accord/utils/KeyRange.java | 20 ---
.../src/test/java/accord/impl/IntHashKey.java | 19 ++-
accord-core/src/test/java/accord/impl/IntKey.java | 43 +++++-
.../src/test/java/accord/impl/TopologyFactory.java | 4 +-
.../src/test/java/accord/impl/list/ListRead.java | 14 +-
.../src/test/java/accord/impl/list/ListWrite.java | 8 +-
.../test/java/accord/impl/mock/MockCluster.java | 5 +-
.../src/test/java/accord/impl/mock/MockStore.java | 4 +-
.../test/java/accord/topology/TopologyTest.java | 77 ++++++++++
.../src/test/java/accord/utils/KeyRangeTest.java | 150 +++++++++++++++++++
.../src/test/java/accord/utils/KeyRangesTest.java | 31 ++++
.../main/java/accord/maelstrom/MaelstromKey.java | 9 ++
.../main/java/accord/maelstrom/MaelstromRead.java | 14 +-
.../main/java/accord/maelstrom/MaelstromWrite.java | 8 +-
.../java/accord/maelstrom/TopologyFactory.java | 19 ++-
25 files changed, 644 insertions(+), 221 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/KeyRange.java b/accord-core/src/main/java/accord/api/KeyRange.java
new file mode 100644
index 0000000..9e6b850
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/KeyRange.java
@@ -0,0 +1,163 @@
+package accord.api;
+
+import accord.txn.Keys;
+import com.google.common.base.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * A range of keys
+ * @param <K>
+ */
+public abstract class KeyRange<K extends Key<K>>
+{
+ public static abstract class EndInclusive<K extends Key<K>> extends KeyRange<K>
+ {
+ public EndInclusive(K start, K end)
+ {
+ super(start, end);
+ }
+
+ @Override
+ public int compareKey(K key)
+ {
+ if (key.compareTo(start()) <= 0)
+ return -1;
+ if (key.compareTo(end()) > 0)
+ return 1;
+ return 0;
+ }
+
+ @Override
+ public boolean startInclusive()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean endInclusive()
+ {
+ return true;
+ }
+ }
+
+ public static abstract class StartInclusive<K extends Key<K>> extends KeyRange<K>
+ {
+ public StartInclusive(K start, K end)
+ {
+ super(start, end);
+ }
+
+ @Override
+ public int compareKey(K key)
+ {
+ if (key.compareTo(start()) < 0)
+ return -1;
+ if (key.compareTo(end()) >= 0)
+ return 1;
+ return 0;
+ }
+
+ @Override
+ public boolean startInclusive()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean endInclusive()
+ {
+ return false;
+ }
+ }
+
+ private final K start;
+ private final K end;
+
+ private KeyRange(K start, K end)
+ {
+ Preconditions.checkArgument(start.compareTo(end) < 0);
+ this.start = start;
+ this.end = end;
+ }
+
+ public final K start()
+ {
+ return start;
+ }
+
+ public final K end()
+ {
+ return end;
+ }
+
+ public abstract boolean startInclusive();
+
+ public abstract boolean endInclusive();
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ KeyRange<?> that = (KeyRange<?>) o;
+ return Objects.equals(start, that.start) && Objects.equals(end, that.end);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(start, end);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Range[" + start + ", " + end + ']';
+ }
+
+ /**
+ * Returns a negative integer, zero, or a positive integer as the provided key is less than, contained by,
+ * or greater than this range.
+ */
+ public abstract int compareKey(K key);
+
+ public boolean containsKey(K key)
+ {
+ return compareKey(key) == 0;
+ }
+
+ /**
+ * returns the index of the first key larger than what's covered by this range
+ */
+ public int higherKeyIndex(Keys keys, int lowerBound, int upperBound)
+ {
+ int i = keys.search(lowerBound, upperBound, this,
+ (k, r) -> ((KeyRange) r).compareKey((Key) k) <= 0 ? -1 : 1);
+ if (i < 0) i = -1 - i;
+ return i;
+ }
+
+ public int higherKeyIndex(Keys keys)
+ {
+ return higherKeyIndex(keys, 0, keys.size());
+ }
+
+ /**
+ * returns the index of the lowest key contained in this range
+ * @param keys
+ */
+ public int lowKeyIndex(Keys keys)
+ {
+ if (keys.isEmpty()) return -1;
+
+ int i = keys.search(0, keys.size(), this,
+ (k, r) -> ((KeyRange) r).compareKey((Key) k) < 0 ? -1 : 1);
+
+ if (i < 0) i = -1 - i;
+
+ if (i == 0 && !containsKey((K) keys.get(0))) i = -1;
+
+ return i;
+ }
+}
diff --git a/accord-core/src/main/java/accord/api/Read.java b/accord-core/src/main/java/accord/api/Read.java
index ff4fd2f..2149418 100644
--- a/accord-core/src/main/java/accord/api/Read.java
+++ b/accord-core/src/main/java/accord/api/Read.java
@@ -7,5 +7,5 @@ package accord.api;
*/
public interface Read
{
- Data read(Key start, Key end, Store store);
+ Data read(KeyRange range, Store store);
}
diff --git a/accord-core/src/main/java/accord/api/Write.java b/accord-core/src/main/java/accord/api/Write.java
index 4404ee6..eba020c 100644
--- a/accord-core/src/main/java/accord/api/Write.java
+++ b/accord-core/src/main/java/accord/api/Write.java
@@ -9,5 +9,5 @@ import accord.txn.Timestamp;
*/
public interface Write
{
- void apply(Key start, Key end, Timestamp executeAt, Store store);
+ void apply(KeyRange range, Timestamp executeAt, Store store);
}
diff --git a/accord-core/src/main/java/accord/topology/KeyRanges.java b/accord-core/src/main/java/accord/topology/KeyRanges.java
new file mode 100644
index 0000000..90ed527
--- /dev/null
+++ b/accord-core/src/main/java/accord/topology/KeyRanges.java
@@ -0,0 +1,48 @@
+package accord.topology;
+
+import accord.api.Key;
+import accord.api.KeyRange;
+
+import java.util.Arrays;
+
+public class KeyRanges
+{
+ public static final KeyRanges EMPTY = new KeyRanges(new KeyRange[0]);
+
+ private final KeyRange[] ranges;
+
+ public KeyRanges(KeyRange[] ranges)
+ {
+ this.ranges = ranges;
+ }
+
+ @Override
+ public String toString()
+ {
+ return Arrays.toString(ranges);
+ }
+
+ public int rangeIndexForKey(int lowerBound, int upperBound, Key key)
+ {
+ return Arrays.binarySearch(ranges, lowerBound, upperBound, key,
+ (r, k) -> -((KeyRange) r).compareKey((Key) k));
+ }
+
+ public int rangeIndexForKey(Key key)
+ {
+ return rangeIndexForKey(0, ranges.length, key);
+ }
+
+ public int size()
+ {
+ return ranges.length;
+ }
+
+ public KeyRanges select(int[] indexes)
+ {
+ KeyRange[] selection = new KeyRange[indexes.length];
+ for (int i=0; i<indexes.length; i++)
+ selection[i] = ranges[i];
+ return new KeyRanges(selection);
+ }
+}
diff --git a/accord-core/src/main/java/accord/topology/Shard.java b/accord-core/src/main/java/accord/topology/Shard.java
index 925308c..fcd7ee2 100644
--- a/accord-core/src/main/java/accord/topology/Shard.java
+++ b/accord-core/src/main/java/accord/topology/Shard.java
@@ -3,6 +3,7 @@ package accord.topology;
import java.util.List;
import java.util.Set;
+import accord.api.KeyRange;
import accord.local.Node.Id;
import accord.api.Key;
import com.google.common.annotations.VisibleForTesting;
@@ -10,17 +11,16 @@ import com.google.common.base.Preconditions;
public class Shard
{
- public final Key start, end;
+ public final KeyRange range;
public final List<Id> nodes;
public final Set<Id> fastPathElectorate;
public final int recoveryFastPathSize;
public final int fastPathQuorumSize;
public final int slowPathQuorumSize;
- public Shard(Key start, Key end, List<Id> nodes, Set<Id> fastPathElectorate)
+ public Shard(KeyRange range, List<Id> nodes, Set<Id> fastPathElectorate)
{
- this.start = start;
- this.end = end;
+ this.range = range;
this.nodes = nodes;
int f = maxToleratedFailures(nodes.size());
this.fastPathElectorate = fastPathElectorate;
@@ -40,18 +40,17 @@ public class Shard
static int fastPathQuorumSize(int replicas, int electorate, int f)
{
Preconditions.checkArgument(electorate >= replicas - f);
-// return (fastPathElectorateSize + f + 1 + 1) / 2;
return (f + electorate)/2 + 1;
}
public boolean contains(Key key)
{
- return key.compareTo(start) >= 0 && key.compareTo(end) < 0;
+ return range.containsKey(key);
}
@Override
public String toString()
{
- return "Shard[" + start + ',' + end + ']';
+ return "Shard[" + range.start() + ',' + range.end() + ']';
}
}
diff --git a/accord-core/src/main/java/accord/topology/Shards.java b/accord-core/src/main/java/accord/topology/Shards.java
index 6c5eadf..4c46f1b 100644
--- a/accord-core/src/main/java/accord/topology/Shards.java
+++ b/accord-core/src/main/java/accord/topology/Shards.java
@@ -3,20 +3,30 @@ package accord.topology;
import java.util.Collections;
import java.util.Map;
+import accord.api.KeyRange;
import accord.local.Node.Id;
import accord.txn.Keys;
public class Shards extends Topology
{
- public static final Shards EMPTY = new Shards(Keys.EMPTY, new Shard[0], Collections.emptyMap(), Keys.EMPTY, new int[0]);
+ public static final Shards EMPTY = new Shards(new Shard[0], KeyRanges.EMPTY, Collections.emptyMap(), KeyRanges.EMPTY.EMPTY, new int[0]);
public Shards(Shard... shards)
{
super(shards);
}
- public Shards(Keys starts, Shard[] shards, Map<Id, NodeInfo> nodeLookup, Keys subsetOfStarts, int[] supersetIndexes)
+ public Shards(Shard[] shards, KeyRanges ranges, Map<Id, NodeInfo> nodeLookup, KeyRanges subsetOfRanges, int[] supersetIndexes)
{
- super(starts, shards, nodeLookup, subsetOfStarts, supersetIndexes);
+ super(shards, ranges, nodeLookup, subsetOfRanges, supersetIndexes);
+ }
+
+ public static Shards select(Shard[] shards, int[] indexes)
+ {
+ Shard[] subset = new Shard[indexes.length];
+ for (int i=0; i<indexes.length; i++)
+ subset[i] = shards[indexes[i]];
+
+ return new Shards(subset);
}
}
diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java
index c3308b7..f568307 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -8,9 +8,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.IntFunction;
-import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import accord.api.KeyRange;
import accord.local.Node.Id;
import accord.api.Key;
import accord.txn.Keys;
@@ -18,36 +18,35 @@ import accord.utils.IndexedConsumer;
public class Topology extends AbstractCollection<Shard>
{
- // TODO: introduce range version of Keys
- final Keys starts;
final Shard[] shards;
+ final KeyRanges ranges;
final Map<Id, Shards.NodeInfo> nodeLookup;
- final Keys subsetOfStarts;
+ final KeyRanges subsetOfRanges;
final int[] supersetIndexes;
static class NodeInfo
{
- final Keys starts;
+ final KeyRanges ranges;
final int[] supersetIndexes;
- NodeInfo(Keys starts, int[] supersetIndexes)
+ NodeInfo(KeyRanges ranges, int[] supersetIndexes)
{
- this.starts = starts;
+ this.ranges = ranges;
this.supersetIndexes = supersetIndexes;
}
@Override
public String toString()
{
- return starts.toString();
+ return ranges.toString();
}
}
public Topology(Shard... shards)
{
- this.starts = new Keys(Arrays.stream(shards).map(shard -> shard.start).sorted().collect(Collectors.toList()));
+ this.ranges = new KeyRanges(Arrays.stream(shards).map(shard -> shard.range).toArray(KeyRange[]::new));
this.shards = shards;
- this.subsetOfStarts = starts;
+ this.subsetOfRanges = ranges;
this.supersetIndexes = IntStream.range(0, shards.length).toArray();
this.nodeLookup = new HashMap<>();
Map<Id, List<Integer>> build = new HashMap<>();
@@ -59,17 +58,17 @@ public class Topology extends AbstractCollection<Shard>
for (Map.Entry<Id, List<Integer>> e : build.entrySet())
{
int[] supersetIndexes = e.getValue().stream().mapToInt(i -> i).toArray();
- Keys starts = this.starts.select(supersetIndexes);
- nodeLookup.put(e.getKey(), new Shards.NodeInfo(starts, supersetIndexes));
+ KeyRanges ranges = this.ranges.select(supersetIndexes);
+ nodeLookup.put(e.getKey(), new Shards.NodeInfo(ranges, supersetIndexes));
}
}
- public Topology(Keys starts, Shard[] shards, Map<Id, Shards.NodeInfo> nodeLookup, Keys subsetOfStarts, int[] supersetIndexes)
+ public Topology(Shard[] shards, KeyRanges ranges, Map<Id, Shards.NodeInfo> nodeLookup, KeyRanges subsetOfRanges, int[] supersetIndexes)
{
- this.starts = starts;
this.shards = shards;
+ this.ranges = ranges;
this.nodeLookup = nodeLookup;
- this.subsetOfStarts = subsetOfStarts;
+ this.subsetOfRanges = subsetOfRanges;
this.supersetIndexes = supersetIndexes;
}
@@ -78,12 +77,14 @@ public class Topology extends AbstractCollection<Shard>
NodeInfo info = nodeLookup.get(node);
if (info == null)
return Shards.EMPTY;
- return forKeys(info.starts);
+ return Shards.select(shards, info.supersetIndexes);
}
public Shard forKey(Key key)
{
- int i = starts.floorIndex(key);
+ int i = ranges.rangeIndexForKey(key);
+ if (i < 0 || i >= ranges.size())
+ throw new IllegalArgumentException("Range not found for " + key);
return shards[i];
}
@@ -91,19 +92,23 @@ public class Topology extends AbstractCollection<Shard>
{
int subsetIndex = 0;
int count = 0;
- int[] newSubset = new int[Math.min(select.size(), subsetOfStarts.size())];
+ int[] newSubset = new int[Math.min(select.size(), subsetOfRanges.size())];
for (int i = 0 ; i < select.size() ; )
{
- subsetIndex = subsetOfStarts.floorIndex(subsetIndex, subsetOfStarts.size(), select.get(i));
+ // find the range containing the key at i
+ subsetIndex = subsetOfRanges.rangeIndexForKey(subsetIndex, subsetOfRanges.size(), select.get(i));
+ if (subsetIndex < 0 || subsetIndex >= subsetOfRanges.size())
+ throw new IllegalArgumentException("Range not found for " + select.get(i));
int supersetIndex = supersetIndexes[subsetIndex];
newSubset[count++] = supersetIndex;
Shard shard = shards[supersetIndex];
- i = select.ceilIndex(i, select.size(), shard.end);
+ // find the first key outside this range
+ i = shard.range.higherKeyIndex(select, i, select.size());
}
if (count != newSubset.length)
newSubset = Arrays.copyOf(newSubset, count);
- Keys subsetOfKeys = starts.select(newSubset);
- return new Shards(starts, shards, nodeLookup, subsetOfKeys, newSubset);
+ KeyRanges rangeSubset = ranges.select(newSubset);
+ return new Shards(shards, ranges, nodeLookup, rangeSubset, newSubset);
}
/**
@@ -113,21 +118,6 @@ public class Topology extends AbstractCollection<Shard>
public void forEachOn(Id on, Keys select, IndexedConsumer<Shard> consumer)
{
Shards.NodeInfo info = nodeLookup.get(on);
-// int nodeIndex = 0;
-// int subsetIndex = 0;
-// for (int i = select.ceilIndex(info.starts.get(0)) ; i < select.size() ; )
-// {
-// nodeIndex = info.starts.floorIndex(nodeIndex, info.starts.size(), select.get(i));
-// int supersetIndex = info.supersetIndexes[nodeIndex];
-// Shard shard = shards[supersetIndex];
-// if (shard.end.compareTo(select.get(i)) > 0)
-// {
-// subsetIndex = Arrays.binarySearch(supersetIndexes, subsetIndex, supersetIndexes.length, supersetIndex);
-// consumer.accept(subsetIndex, shard);
-// }
-// i = select.ceilIndex(i + 1, select.size(), shard.end);
-// }
-
for (int i = 0, j = 0, k = 0 ; i < select.size() && j < supersetIndexes.length && k < info.supersetIndexes.length ;)
{
Key key = select.get(i);
@@ -135,9 +125,13 @@ public class Topology extends AbstractCollection<Shard>
int c = supersetIndexes[j] - info.supersetIndexes[k];
if (c < 0) ++j;
else if (c > 0) ++k;
- else if (key.compareTo(shard.start) < 0) ++i;
- else if (key.compareTo(shard.end) < 0) { consumer.accept(j, shard); i++; j++; k++; }
- else { j++; k++; }
+ else
+ {
+ int rcmp = shard.range.compareKey(key);
+ if (rcmp < 0) ++i;
+ else if (rcmp == 0) { consumer.accept(j, shard); i++; j++; k++; }
+ else { j++; k++; }
+ }
}
}
@@ -173,53 +167,17 @@ public class Topology extends AbstractCollection<Shard>
consumer.accept(i, shards[supersetIndexes[i]]);
}
-
public <T> T[] select(Keys select, T[] indexedByShard, IntFunction<T[]> constructor)
{
List<T> selection = new ArrayList<>();
-// int subsetIndex = 0;
-// for (int i = select.ceilIndex(shards[supersetIndexes[0]].start) ; i < select.size() ; )
-// {
-// subsetIndex = subsetOfStarts.floorIndex(subsetIndex, subsetOfStarts.size(), select.get(i));
-// selection.add(indexedByShard[subsetIndex]);
-// Shard shard = shards[supersetIndexes[subsetIndex]];
-// i = select.ceilIndex(i + 1, select.size(), shard.end);
-// }
-
-// int minSubsetIndex = 0;
-// for (int i = select.ceilIndex(shards[supersetIndexes[0]].start) ; i < select.size() ; )
-// {
-// int subsetIndex = subsetOfStarts.floorIndex(minSubsetIndex, subsetOfStarts.size(), select.get(i));
-// selection.add(indexedByShard[subsetIndex]);
-// minSubsetIndex = subsetIndex + 1;
-// if (minSubsetIndex == supersetIndexes.length)
-// break;
-// Shard shard = shards[supersetIndexes[minSubsetIndex]];
-// i = select.ceilIndex(i + 1, select.size(), shard.start);
-// }
-
-// int minSubsetIndex = 0;
-// for (int i = select.ceilIndex(shards[supersetIndexes[0]].start) ; i < select.size() ; )
-// {
-// int subsetIndex = subsetOfStarts.floorIndex(minSubsetIndex, subsetOfStarts.size(), select.get(i));
-// Shard shard = shards[supersetIndexes[subsetIndex]];
-// if (shard.end.compareTo(select.get(i)) > 0)
-// selection.add(indexedByShard[subsetIndex]);
-// minSubsetIndex = subsetIndex + 1;
-// if (minSubsetIndex == supersetIndexes.length)
-// break;
-//
-// shard = shards[supersetIndexes[minSubsetIndex]];
-// i = select.ceilIndex(i + 1, select.size(), shard.start);
-// }
-
for (int i = 0, j = 0 ; i < select.size() && j < supersetIndexes.length ;)
{
Key k = select.get(i);
Shard shard = shards[supersetIndexes[j]];
- int c = k.compareTo(shard.start);
+
+ int c = shard.range.compareKey(k);
if (c < 0) ++i;
- else if (k.compareTo(shard.end) < 0) { selection.add(indexedByShard[j++]); i++; }
+ else if (c == 0) { selection.add(indexedByShard[j++]); i++; }
else j++;
}
@@ -235,7 +193,7 @@ public class Topology extends AbstractCollection<Shard>
@Override
public int size()
{
- return subsetOfStarts.size();
+ return subsetOfRanges.size();
}
public Shard get(int index)
diff --git a/accord-core/src/main/java/accord/txn/Keys.java b/accord-core/src/main/java/accord/txn/Keys.java
index 4e49a8c..f72d419 100644
--- a/accord-core/src/main/java/accord/txn/Keys.java
+++ b/accord-core/src/main/java/accord/txn/Keys.java
@@ -1,9 +1,6 @@
package accord.txn;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.SortedSet;
+import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -43,17 +40,6 @@ public class Keys implements Iterable<Key>
return keys[indexOf];
}
- public Stream<Key> subSet(Key start, boolean isInclusiveStart, Key end, boolean isInclusiveEnd)
- {
- int i = Arrays.binarySearch(keys, start);
- if (i < 0) i = -1 -i;
- else if (!isInclusiveStart) ++i;
- int j = Arrays.binarySearch(keys, end);
- if (j < 0) j = -1 -j;
- else if (isInclusiveEnd) ++j;
- return Arrays.stream(keys, i, j);
- }
-
public Keys select(int[] indexes)
{
Key[] selection = new Key[indexes.length];
@@ -62,59 +48,31 @@ public class Keys implements Iterable<Key>
return new Keys(selection);
}
- public int size()
+ public boolean isEmpty()
{
- return keys.length;
+ return keys.length == 0;
}
- public int ceilIndex(int lowerBound, int upperBound, Key key)
+ public int size()
{
- int i = Arrays.binarySearch(keys, lowerBound, upperBound, key);
- if (i < 0) i = -1 - i;
- return i;
+ return keys.length;
}
- public int ceilIndex(Key key)
+ public int search(int lowerBound, int upperBound, Object key, Comparator<Object> comparator)
{
- return ceilIndex(0, keys.length, key);
+ return Arrays.binarySearch(keys, lowerBound, upperBound, key, comparator);
}
- public int higherIndex(int lowerBound, int upperBound, Key key)
+ public int ceilIndex(int lowerBound, int upperBound, Key key)
{
int i = Arrays.binarySearch(keys, lowerBound, upperBound, key);
if (i < 0) i = -1 - i;
- else ++i;
- return i;
- }
-
- public int higherIndex(Key key)
- {
- return higherIndex(0, keys.length, key);
- }
-
- public int floorIndex(int lowerBound, int upperBound, Key key)
- {
- int i = Arrays.binarySearch(keys, lowerBound, upperBound, key);
- if (i < 0) i = -2 - i;
- return i;
- }
-
- public int floorIndex(Key key)
- {
- return floorIndex(0, keys.length, key);
- }
-
- public int lowerIndex(int lowerBound, int upperBound, Key key)
- {
- int i = Arrays.binarySearch(keys, lowerBound, upperBound, key);
- if (i < 0) i = -2 - i;
- else --i;
return i;
}
- public int lowerIndex(Key key)
+ public int ceilIndex(Key key)
{
- return lowerIndex(0, keys.length, key);
+ return ceilIndex(0, keys.length, key);
}
public Stream<Key> stream()
diff --git a/accord-core/src/main/java/accord/txn/Txn.java b/accord-core/src/main/java/accord/txn/Txn.java
index f41daeb..1249d97 100644
--- a/accord-core/src/main/java/accord/txn/Txn.java
+++ b/accord-core/src/main/java/accord/txn/Txn.java
@@ -3,13 +3,7 @@ package accord.txn;
import java.util.Comparator;
import java.util.stream.Stream;
-import accord.api.Data;
-import accord.api.Query;
-import accord.api.Read;
-import accord.api.Result;
-import accord.api.Update;
-import accord.api.Key;
-import accord.api.Store;
+import accord.api.*;
import accord.local.Command;
import accord.local.CommandsForKey;
import accord.local.Instance;
@@ -80,15 +74,15 @@ public class Txn
return "read:" + read.toString() + (update != null ? ", update:" + update : "");
}
- public Data read(Key start, Key end, Store store)
+ public Data read(KeyRange range, Store store)
{
- return read.read(start, end, store);
+ return read.read(range, store);
}
public Data read(Command command)
{
Instance instance = command.instance;
- return read(instance.shard.start, instance.shard.end, instance.store());
+ return read(instance.shard.range, instance.store());
}
// TODO: move these somewhere else?
diff --git a/accord-core/src/main/java/accord/txn/Writes.java b/accord-core/src/main/java/accord/txn/Writes.java
index d17b8e6..4913e77 100644
--- a/accord-core/src/main/java/accord/txn/Writes.java
+++ b/accord-core/src/main/java/accord/txn/Writes.java
@@ -19,7 +19,7 @@ public class Writes
public void apply(Instance instance)
{
if (write != null)
- write.apply(instance.shard.start, instance.shard.end, executeAt, instance.store());
+ write.apply(instance.shard.range, executeAt, instance.store());
}
@Override
diff --git a/accord-core/src/main/java/accord/utils/KeyRange.java b/accord-core/src/main/java/accord/utils/KeyRange.java
deleted file mode 100644
index 6ade671..0000000
--- a/accord-core/src/main/java/accord/utils/KeyRange.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package accord.utils;
-
-import accord.api.Key;
-
-public class KeyRange<K extends Key<K>>
-{
- public final K start;
- public final K end;
-
- private KeyRange(K start, K end)
- {
- this.start = start;
- this.end = end;
- }
-
- public static <K extends Key<K>> KeyRange<K> of(K start, K end)
- {
- return new KeyRange<>(start, end);
- }
-}
diff --git a/accord-core/src/test/java/accord/impl/IntHashKey.java b/accord-core/src/test/java/accord/impl/IntHashKey.java
index 79460e1..7d1c696 100644
--- a/accord-core/src/test/java/accord/impl/IntHashKey.java
+++ b/accord-core/src/test/java/accord/impl/IntHashKey.java
@@ -5,11 +5,19 @@ import java.util.List;
import java.util.zip.CRC32C;
import accord.api.Key;
-import accord.utils.KeyRange;
+import accord.api.KeyRange;
import accord.txn.Keys;
public class IntHashKey implements Key<IntHashKey>
{
+ private static class Range extends KeyRange.EndInclusive<IntHashKey>
+ {
+ public Range(IntHashKey start, IntHashKey end)
+ {
+ super(start, end);
+ }
+ }
+
public final int key;
public final int hash;
@@ -47,11 +55,6 @@ public class IntHashKey implements Key<IntHashKey>
return new Keys(keys);
}
- public static KeyRange<IntHashKey> range(int start, int end)
- {
- return KeyRange.of(key(start), key(end));
- }
-
public static KeyRange<IntHashKey>[] ranges(int count)
{
List<KeyRange<IntHashKey>> result = new ArrayList<>();
@@ -61,10 +64,10 @@ public class IntHashKey implements Key<IntHashKey>
for (int i = 1 ; i < count ; ++i)
{
IntHashKey next = new IntHashKey(Integer.MIN_VALUE, (int)Math.min(Integer.MAX_VALUE, start + i * delta));
- result.add(KeyRange.of(prev, next));
+ result.add(new Range(prev, next));
prev = next;
}
- result.add(KeyRange.of(prev, new IntHashKey(Integer.MIN_VALUE, Integer.MAX_VALUE)));
+ result.add(new Range(prev, new IntHashKey(Integer.MIN_VALUE, Integer.MAX_VALUE)));
return result.toArray(KeyRange[]::new);
}
diff --git a/accord-core/src/test/java/accord/impl/IntKey.java b/accord-core/src/test/java/accord/impl/IntKey.java
index c24472b..eb57364 100644
--- a/accord-core/src/test/java/accord/impl/IntKey.java
+++ b/accord-core/src/test/java/accord/impl/IntKey.java
@@ -2,16 +2,25 @@ package accord.impl;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import accord.api.Key;
-import accord.utils.KeyRange;
+import accord.api.KeyRange;
import accord.txn.Keys;
public class IntKey implements Key<IntKey>
{
+ private static class Range extends KeyRange.EndInclusive<IntKey>
+ {
+ public Range(IntKey start, IntKey end)
+ {
+ super(start, end);
+ }
+ }
+
public final int key;
- private IntKey(int key)
+ public IntKey(int key)
{
this.key = key;
}
@@ -30,16 +39,21 @@ public class IntKey implements Key<IntKey>
public static Keys keys(int k0, int... kn)
{
Key[] keys = new Key[kn.length + 1];
- keys[0] = key(k0);
+ keys[0] = new IntKey(k0);
for (int i=0; i<kn.length; i++)
- keys[i + 1] = key(kn[i]);
+ keys[i + 1] = new IntKey(kn[i]);
return new Keys(keys);
}
+ public static KeyRange<IntKey> range(IntKey start, IntKey end)
+ {
+ return new Range(start, end);
+ }
+
public static KeyRange<IntKey> range(int start, int end)
{
- return KeyRange.of(key(start), key(end));
+ return range(key(start), key(end));
}
public static KeyRange<IntKey>[] ranges(int count)
@@ -51,10 +65,10 @@ public class IntKey implements Key<IntKey>
for (int i = 1 ; i < count ; ++i)
{
IntKey next = new IntKey((int)Math.min(Integer.MAX_VALUE, start + i * delta));
- result.add(KeyRange.of(prev, next));
+ result.add(new Range(prev, next));
prev = next;
}
- result.add(KeyRange.of(prev, IntKey.key(Integer.MAX_VALUE)));
+ result.add(new Range(prev, new IntKey(Integer.MAX_VALUE)));
return result.toArray(KeyRange[]::new);
}
@@ -63,4 +77,19 @@ public class IntKey implements Key<IntKey>
{
return Integer.toString(key);
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ IntKey intKey = (IntKey) o;
+ return key == intKey.key;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(key);
+ }
}
diff --git a/accord-core/src/test/java/accord/impl/TopologyFactory.java b/accord-core/src/test/java/accord/impl/TopologyFactory.java
index a8dad3b..ccdde7e 100644
--- a/accord-core/src/test/java/accord/impl/TopologyFactory.java
+++ b/accord-core/src/test/java/accord/impl/TopologyFactory.java
@@ -4,7 +4,7 @@ package accord.impl;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.Key;
-import accord.utils.KeyRange;
+import accord.api.KeyRange;
import accord.topology.Shard;
import accord.topology.Shards;
import accord.utils.WrapAroundList;
@@ -42,7 +42,7 @@ public class TopologyFactory<K extends Key<K>>
final List<Shard> shards = new ArrayList<>();
for (int i = 0 ; i < ranges.length ; ++i)
- shards.add(new Shard(ranges[i].start, ranges[i].end, electorates.get(i % electorates.size()), fastPathElectorates.get(i % fastPathElectorates.size())));
+ shards.add(new Shard(ranges[i], electorates.get(i % electorates.size()), fastPathElectorates.get(i % fastPathElectorates.size())));
return new Shards(shards.toArray(Shard[]::new));
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java b/accord-core/src/test/java/accord/impl/list/ListRead.java
index 3bed2dd..f70ffeb 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -1,11 +1,10 @@
package accord.impl.list;
-import accord.api.Data;
-import accord.api.Key;
-import accord.api.Store;
-import accord.api.Read;
+import accord.api.*;
import accord.txn.Keys;
+import static java.lang.Math.max;
+
public class ListRead implements Read
{
public final Keys keys;
@@ -16,11 +15,14 @@ public class ListRead implements Read
}
@Override
- public Data read(Key start, Key end, Store store)
+ public Data read(KeyRange range, Store store)
{
ListStore s = (ListStore)store;
ListData result = new ListData();
- for (int i = keys.ceilIndex(start), limit = keys.ceilIndex(end) ; i < limit ; ++i)
+ int lowIdx = range.lowKeyIndex(keys);
+ if (lowIdx < 0)
+ return result;
+ for (int i = lowIdx, limit = range.higherKeyIndex(keys) ; i < limit ; ++i)
result.put(keys.get(i), s.get(keys.get(i)));
return result;
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java b/accord-core/src/test/java/accord/impl/list/ListWrite.java
index bc02241..20694f1 100644
--- a/accord-core/src/test/java/accord/impl/list/ListWrite.java
+++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java
@@ -1,9 +1,11 @@
package accord.impl.list;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.TreeMap;
import accord.api.Key;
+import accord.api.KeyRange;
import accord.api.Store;
import accord.api.Write;
import accord.txn.Timestamp;
@@ -12,10 +14,12 @@ import accord.utils.Timestamped;
public class ListWrite extends TreeMap<Key, int[]> implements Write
{
@Override
- public void apply(Key start, Key end, Timestamp executeAt, Store store)
+ public void apply(KeyRange range, Timestamp executeAt, Store store)
{
ListStore s = (ListStore) store;
- for (Map.Entry<Key, int[]> e : subMap(start, true, end, false).entrySet())
+ NavigableMap<Key, int[]> selection = subMap(range.start(), range.startInclusive(),
+ range.end(), range.endInclusive());
+ for (Map.Entry<Key, int[]> e : selection.entrySet())
s.data.merge(e.getKey(), new Timestamped<>(executeAt, e.getValue()), Timestamped::merge);
}
}
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index d407531..8580448 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -3,10 +3,9 @@ package accord.impl.mock;
import accord.NetworkFilter;
import accord.local.Node;
import accord.local.Node.Id;
-import accord.messages.*;
+import accord.messages.Timeout;
import accord.utils.ThreadPoolScheduler;
import accord.txn.TxnId;
-import accord.utils.KeyRange;
import accord.messages.Callback;
import accord.messages.Reply;
import accord.messages.Request;
@@ -85,7 +84,7 @@ public class MockCluster implements Network
Id nextId = nextNodeId();
ids.add(nextId);
}
- TopologyFactory<IntKey> topologyFactory = new TopologyFactory<>(config.replication, KeyRange.of(IntKey.key(0), IntKey.key(config.maxKey)));
+ TopologyFactory<IntKey> topologyFactory = new TopologyFactory<>(config.replication, IntKey.range(0, config.maxKey));
Shards topology = topologyFactory.toShards(ids);
for (int i=0; i<config.initialNodes; i++)
{
diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java b/accord-core/src/test/java/accord/impl/mock/MockStore.java
index 3a27958..36b2271 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockStore.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java
@@ -19,8 +19,8 @@ public class MockStore implements Store
};
public static final Result RESULT = new Result() {};
- public static final Read READ = (start, end, store) -> DATA;
+ public static final Read READ = (range, store) -> DATA;
public static final Query QUERY = data -> RESULT;
- public static final Write WRITE = (start, end, executeAt, store) -> {};
+ public static final Write WRITE = (range, executeAt, store) -> {};
public static final Update UPDATE = data -> WRITE;
}
diff --git a/accord-core/src/test/java/accord/topology/TopologyTest.java b/accord-core/src/test/java/accord/topology/TopologyTest.java
new file mode 100644
index 0000000..76a1fa9
--- /dev/null
+++ b/accord-core/src/test/java/accord/topology/TopologyTest.java
@@ -0,0 +1,77 @@
+package accord.topology;
+
+import accord.Utils;
+import accord.api.Key;
+import accord.api.KeyRange;
+import accord.impl.IntKey;
+import accord.impl.TopologyFactory;
+import accord.local.Node;
+import accord.txn.Keys;
+import com.google.common.collect.Iterables;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static accord.impl.IntKey.key;
+import static accord.impl.IntKey.range;
+
+public class TopologyTest
+{
+
+ private static void assertRangeForKey(Topology topology, int key, int start, int end)
+ {
+ Key expectedKey = key(key);
+ Shard shard = topology.forKey(key(key));
+ KeyRange expectedRange = range(start, end);
+ Assertions.assertTrue(expectedRange.containsKey(expectedKey));
+ Assertions.assertTrue(shard.range.containsKey(expectedKey));
+ Assertions.assertEquals(expectedRange, shard.range);
+
+ Shards shards = topology.forKeys(Keys.of(expectedKey));
+ shard = Iterables.getOnlyElement(shards);
+ Assertions.assertTrue(shard.range.containsKey(expectedKey));
+ Assertions.assertEquals(expectedRange, shard.range);
+ }
+
+ private static Topology topology(List<Node.Id> ids, int rf, KeyRange... ranges)
+ {
+ TopologyFactory<IntKey> topologyFactory = new TopologyFactory<>(rf, ranges);
+ return topologyFactory.toShards(ids);
+ }
+
+ private static Topology topology(int numNodes, int rf, KeyRange... ranges)
+ {
+ return topology(Utils.ids(numNodes), rf, ranges);
+ }
+
+ private static Topology topology(KeyRange... ranges)
+ {
+ return topology(1, 1, ranges);
+ }
+
+ private static KeyRange<IntKey> r(int start, int end)
+ {
+ return IntKey.range(start, end);
+ }
+
+ @Test
+ void forKeyTest()
+ {
+ Topology topology = topology(r(0, 100), r(100, 200), r(200, 300));
+ assertRangeForKey(topology, 50, 0, 100);
+ assertRangeForKey(topology, 100, 0, 100);
+ }
+
+ @Test
+ void forRangesTest()
+ {
+
+ }
+
+ @Test
+ void sequentialRanges()
+ {
+ // TODO: confirm non-sequential ranges are handled properly
+ }
+}
diff --git a/accord-core/src/test/java/accord/utils/KeyRangeTest.java b/accord-core/src/test/java/accord/utils/KeyRangeTest.java
new file mode 100644
index 0000000..b6ce4a4
--- /dev/null
+++ b/accord-core/src/test/java/accord/utils/KeyRangeTest.java
@@ -0,0 +1,150 @@
+package accord.utils;
+
+import accord.api.Key;
+import accord.api.KeyRange;
+import accord.impl.IntKey;
+import accord.txn.Keys;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class KeyRangeTest
+{
+ static IntKey k(int v)
+ {
+ return new IntKey(v);
+ }
+
+ static KeyRange<IntKey> rangeEndIncl(int start, int end)
+ {
+ return new KeyRange.EndInclusive<>(k(start), k(end)) {};
+ }
+
+ static KeyRange<IntKey> rangeStartIncl(int start, int end)
+ {
+ return new KeyRange.StartInclusive<>(k(start), k(end)) {};
+ }
+
+ static Keys keys(int... values)
+ {
+ Key[] keys = new Key[values.length];
+ for (int i=0; i<values.length; i++)
+ keys[i] = IntKey.key(values[i]);
+ return new Keys(keys);
+ }
+
+ private static void assertInvalidKeyRange(int start, int end)
+ {
+ try
+ {
+ rangeStartIncl(start, end);
+ Assertions.fail("Expected IllegalArgumentException");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // expected
+ }
+
+ try
+ {
+ rangeEndIncl(start, end);
+ Assertions.fail("Expected IllegalArgumentException");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // expected
+ }
+ }
+
+ @Test
+ void invalidRangeTest()
+ {
+ assertInvalidKeyRange(1, 1);
+ assertInvalidKeyRange(2, 1);
+ }
+
+ @Test
+ void containsTest()
+ {
+ KeyRange<IntKey> endInclRange = rangeEndIncl(10, 20);
+ Assertions.assertFalse(endInclRange.containsKey(k(10)));
+ Assertions.assertFalse(endInclRange.startInclusive());
+ Assertions.assertTrue(endInclRange.containsKey(k(20)));
+ Assertions.assertTrue(endInclRange.endInclusive());
+
+ KeyRange<IntKey> startInclRange = rangeStartIncl(10, 20);
+ Assertions.assertTrue(startInclRange.containsKey(k(10)));
+ Assertions.assertTrue(startInclRange.startInclusive());
+ Assertions.assertFalse(startInclRange.containsKey(k(20)));
+ Assertions.assertFalse(startInclRange.endInclusive());
+ }
+
+ private static void assertHigherKeyIndex(int expectedIdx, KeyRange range, Keys keys)
+ {
+ if (expectedIdx > 0 && expectedIdx < keys.size())
+ Assertions.assertTrue(range.containsKey(keys.get(expectedIdx - 1)));
+ int actualIdx = range.higherKeyIndex(keys);
+ Assertions.assertEquals(expectedIdx, actualIdx);
+ }
+
+ @Test
+ void higherKeyIndexTest()
+ {
+ Keys keys = keys(10, 11, 12, 13, 14, 15, 16);
+ assertHigherKeyIndex(0, rangeEndIncl(0, 9), keys);
+ assertHigherKeyIndex(0, rangeStartIncl(0, 10), keys);
+ assertHigherKeyIndex(0, rangeEndIncl(0, 5), keys);
+ assertHigherKeyIndex(0, rangeStartIncl(0, 5), keys);
+
+ assertHigherKeyIndex(1, rangeEndIncl(9, 10), keys);
+ assertHigherKeyIndex(0, rangeStartIncl(9, 10), keys);
+ assertHigherKeyIndex(5, rangeEndIncl(11, 14), keys);
+ assertHigherKeyIndex(4, rangeStartIncl(11, 14), keys);
+ assertHigherKeyIndex(6, rangeEndIncl(11, 15), keys);
+ assertHigherKeyIndex(5, rangeStartIncl(11, 15), keys);
+
+ assertHigherKeyIndex(7, rangeEndIncl(16, 25), keys);
+ assertHigherKeyIndex(7, rangeStartIncl(16, 25), keys);
+ assertHigherKeyIndex(7, rangeEndIncl(20, 25), keys);
+ assertHigherKeyIndex(7, rangeStartIncl(20, 25), keys);
+ }
+
+ private static void assertLowKeyIndex(int expectedIdx, KeyRange range, Keys keys)
+ {
+ if (expectedIdx >= 0 && expectedIdx < keys.size())
+ {
+ Assertions.assertTrue(range.containsKey(keys.get(expectedIdx)));
+ }
+ else
+ {
+ Assertions.assertFalse(range.containsKey(keys.get(0)));
+ Assertions.assertFalse(range.containsKey(keys.get(keys.size() - 1)));
+ }
+
+ int actualIdx = range.lowKeyIndex(keys);
+ Assertions.assertEquals(expectedIdx, actualIdx);
+ }
+
+ @Test
+ void lowKeyIndexTest()
+ {
+ Keys keys = keys(10, 11, 12, 13, 14, 15, 16);
+ assertLowKeyIndex(-1, rangeEndIncl(0, 5), keys);
+ assertLowKeyIndex(-1, rangeStartIncl(0, 5), keys);
+ assertLowKeyIndex(-1, rangeEndIncl(0, 9), keys);
+ assertLowKeyIndex(-1, rangeStartIncl(0, 9), keys);
+
+ assertLowKeyIndex(0, rangeEndIncl(5, 10), keys);
+ assertLowKeyIndex(-1, rangeStartIncl(5, 10), keys);
+ assertLowKeyIndex(2, rangeEndIncl(11, 15), keys);
+ assertLowKeyIndex(1, rangeStartIncl(11, 15), keys);
+ assertLowKeyIndex(3, rangeEndIncl(12, 14), keys);
+ assertLowKeyIndex(2, rangeStartIncl(12, 14), keys);
+ assertLowKeyIndex(6, rangeEndIncl(15, 20), keys);
+ assertLowKeyIndex(5, rangeStartIncl(15, 20), keys);
+
+ assertLowKeyIndex(7, rangeEndIncl(16, 20), keys);
+ assertLowKeyIndex(6, rangeStartIncl(16, 20), keys);
+ assertLowKeyIndex(7, rangeEndIncl(20, 25), keys);
+ assertLowKeyIndex(7, rangeStartIncl(20, 25), keys);
+ }
+}
diff --git a/accord-core/src/test/java/accord/utils/KeyRangesTest.java b/accord-core/src/test/java/accord/utils/KeyRangesTest.java
new file mode 100644
index 0000000..4b3990e
--- /dev/null
+++ b/accord-core/src/test/java/accord/utils/KeyRangesTest.java
@@ -0,0 +1,31 @@
+package accord.utils;
+
+import accord.api.KeyRange;
+import accord.impl.IntKey;
+import accord.topology.KeyRanges;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class KeyRangesTest
+{
+ private static KeyRange<IntKey> r(int start, int end)
+ {
+ return IntKey.range(start, end);
+ }
+
+ private static KeyRanges ranges(KeyRange... ranges)
+ {
+ return new KeyRanges(ranges);
+ }
+
+ @Test
+ void rangeIndexForKeyTest()
+ {
+ KeyRanges ranges = ranges(r(100, 200), r(300, 400));
+ Assertions.assertEquals(-1, ranges.rangeIndexForKey(IntKey.key(50)));
+ Assertions.assertEquals(0, ranges.rangeIndexForKey(IntKey.key(150)));
+ Assertions.assertEquals(-2, ranges.rangeIndexForKey(IntKey.key(250)));
+ Assertions.assertEquals(1, ranges.rangeIndexForKey(IntKey.key(350)));
+ Assertions.assertEquals(-3, ranges.rangeIndexForKey(IntKey.key(450)));
+ }
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
index a16ccd2..208df48 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
@@ -3,12 +3,21 @@ package accord.maelstrom;
import java.io.IOException;
import accord.api.Key;
+import accord.api.KeyRange;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
public class MaelstromKey extends Datum<MaelstromKey> implements Key<MaelstromKey>
{
+ public static class Range extends KeyRange.EndInclusive<MaelstromKey>
+ {
+ public Range(MaelstromKey start, MaelstromKey end)
+ {
+ super(start, end);
+ }
+ }
+
public MaelstromKey(Kind kind, Object value)
{
super(kind, value);
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
index 5a2630f..7773ba8 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
@@ -1,11 +1,10 @@
package accord.maelstrom;
-import accord.api.Data;
-import accord.api.Key;
-import accord.api.Store;
-import accord.api.Read;
+import accord.api.*;
import accord.txn.Keys;
+import static java.lang.Math.max;
+
public class MaelstromRead implements Read
{
final Keys keys;
@@ -16,11 +15,14 @@ public class MaelstromRead implements Read
}
@Override
- public Data read(Key start, Key end, Store store)
+ public Data read(KeyRange range, Store store)
{
MaelstromStore s = (MaelstromStore)store;
MaelstromData result = new MaelstromData();
- for (int i = keys.ceilIndex(start), limit = keys.ceilIndex(end) ; i < limit ; ++i)
+ int lowIdx = range.lowKeyIndex(keys);
+ if (lowIdx < 0)
+ return result;
+ for (int i = lowIdx, limit = range.higherKeyIndex(keys) ; i < limit ; ++i)
result.put(keys.get(i), s.get(keys.get(i)));
return result;
}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
index 5adbd9d..31ec395 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
@@ -1,9 +1,11 @@
package accord.maelstrom;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.TreeMap;
import accord.api.Key;
+import accord.api.KeyRange;
import accord.api.Store;
import accord.api.Write;
import accord.txn.Timestamp;
@@ -12,10 +14,12 @@ import accord.utils.Timestamped;
public class MaelstromWrite extends TreeMap<Key, Value> implements Write
{
@Override
- public void apply(Key start, Key end, Timestamp executeAt, Store store)
+ public void apply(KeyRange range, Timestamp executeAt, Store store)
{
MaelstromStore s = (MaelstromStore) store;
- for (Map.Entry<Key, Value> e : subMap(start, true, end, false).entrySet())
+ NavigableMap<Key, Value> selection = subMap(range.start(), range.startInclusive(),
+ range.end(), range.endInclusive());
+ for (Map.Entry<Key, Value> e : selection.entrySet())
s.data.merge(e.getKey(), new Timestamped<>(executeAt, e.getValue()), Timestamped::merge);
}
}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java b/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java
index bfa8e5e..f9dcdc1 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java
@@ -6,6 +6,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import accord.api.KeyRange;
import accord.local.Node.Id;
import accord.maelstrom.Datum.Kind;
import accord.topology.Shard;
@@ -18,22 +19,24 @@ public class TopologyFactory
final int shards;
final int rf;
final Kind[] kinds;
- final MaelstromKey[][] starts, ends;
+ final KeyRange<MaelstromKey>[][] ranges;
public TopologyFactory(int shards, int rf)
{
this.shards = shards;
this.rf = rf;
this.kinds = Datum.COMPARE_BY_HASH ? new Kind[] { Kind.HASH } : new Kind[] { Kind.STRING, Kind.LONG, Kind.DOUBLE };
- this.starts = new MaelstromKey[kinds.length][shards];
- this.ends = new MaelstromKey[kinds.length][shards];
+ this.ranges = new MaelstromKey.Range[kinds.length][shards];
for (int i = 0 ; i < kinds.length ; ++i)
{
Kind kind = kinds[i];
- starts[i] = kind.split(shards);
- ends[i] = new MaelstromKey[shards];
- System.arraycopy(starts[i], 1, ends[i], 0, shards - 1);
- ends[i][shards - 1] = new MaelstromKey(kind, null);
+ MaelstromKey[] starts = kind.split(shards);
+ MaelstromKey[] ends = new MaelstromKey[shards];
+ System.arraycopy(starts, 1, ends, 0, shards - 1);
+ ends[shards - 1] = new MaelstromKey(kind, null);
+ this.ranges[i] = new MaelstromKey.Range[shards];
+ for (int j=0; j<shards; j++)
+ ranges[i][j] = new MaelstromKey.Range(starts[j], ends[j]);
}
}
@@ -59,7 +62,7 @@ public class TopologyFactory
for (int j = 0 ; j < kinds.length ; ++j)
{
for (int i = 0 ; i < this.shards ; ++i)
- shards.add(new Shard(starts[j][i], ends[j][i], electorates.get(i % electorates.size()), fastPathElectorates.get(i % fastPathElectorates.size())));
+ shards.add(new Shard(ranges[j][i], electorates.get(i % electorates.size()), fastPathElectorates.get(i % fastPathElectorates.size())));
}
return new Shards(shards.toArray(Shard[]::new));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org