You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2021/10/29 11:29:43 UTC

[cassandra-accord] 02/02: KeyRange refactor

This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 210ff37b75fc07603b5f655171f6c49cf75805da
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Tue Sep 14 13:52:17 2021 -0700

    KeyRange refactor
---
 accord-core/src/main/java/accord/api/KeyRange.java | 163 +++++++++++++++++++++
 accord-core/src/main/java/accord/api/Read.java     |   2 +-
 accord-core/src/main/java/accord/api/Write.java    |   2 +-
 .../src/main/java/accord/topology/KeyRanges.java   |  48 ++++++
 .../src/main/java/accord/topology/Shard.java       |  13 +-
 .../src/main/java/accord/topology/Shards.java      |  16 +-
 .../src/main/java/accord/topology/Topology.java    | 118 +++++----------
 accord-core/src/main/java/accord/txn/Keys.java     |  62 ++------
 accord-core/src/main/java/accord/txn/Txn.java      |  14 +-
 accord-core/src/main/java/accord/txn/Writes.java   |   2 +-
 .../src/main/java/accord/utils/KeyRange.java       |  20 ---
 .../src/test/java/accord/impl/IntHashKey.java      |  19 ++-
 accord-core/src/test/java/accord/impl/IntKey.java  |  43 +++++-
 .../src/test/java/accord/impl/TopologyFactory.java |   4 +-
 .../src/test/java/accord/impl/list/ListRead.java   |  14 +-
 .../src/test/java/accord/impl/list/ListWrite.java  |   8 +-
 .../test/java/accord/impl/mock/MockCluster.java    |   5 +-
 .../src/test/java/accord/impl/mock/MockStore.java  |   4 +-
 .../test/java/accord/topology/TopologyTest.java    |  77 ++++++++++
 .../src/test/java/accord/utils/KeyRangeTest.java   | 150 +++++++++++++++++++
 .../src/test/java/accord/utils/KeyRangesTest.java  |  31 ++++
 .../main/java/accord/maelstrom/MaelstromKey.java   |   9 ++
 .../main/java/accord/maelstrom/MaelstromRead.java  |  14 +-
 .../main/java/accord/maelstrom/MaelstromWrite.java |   8 +-
 .../java/accord/maelstrom/TopologyFactory.java     |  19 ++-
 25 files changed, 644 insertions(+), 221 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/KeyRange.java b/accord-core/src/main/java/accord/api/KeyRange.java
new file mode 100644
index 0000000..9e6b850
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/KeyRange.java
@@ -0,0 +1,163 @@
+package accord.api;
+
+import accord.txn.Keys;
+import com.google.common.base.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * A range of keys
+ * @param <K>
+ */
+public abstract class KeyRange<K extends Key<K>>
+{
+    public static abstract class EndInclusive<K extends Key<K>> extends KeyRange<K>
+    {
+        public EndInclusive(K start, K end)
+        {
+            super(start, end);
+        }
+
+        @Override
+        public int compareKey(K key)
+        {
+            if (key.compareTo(start()) <= 0)
+                return -1;
+            if (key.compareTo(end()) > 0)
+                return 1;
+            return 0;
+        }
+
+        @Override
+        public boolean startInclusive()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean endInclusive()
+        {
+            return true;
+        }
+    }
+
+    public static abstract class StartInclusive<K extends Key<K>> extends KeyRange<K>
+    {
+        public StartInclusive(K start, K end)
+        {
+            super(start, end);
+        }
+
+        @Override
+        public int compareKey(K key)
+        {
+            if (key.compareTo(start()) < 0)
+                return -1;
+            if (key.compareTo(end()) >= 0)
+                return 1;
+            return 0;
+        }
+
+        @Override
+        public boolean startInclusive()
+        {
+            return true;
+        }
+
+        @Override
+        public boolean endInclusive()
+        {
+            return false;
+        }
+    }
+
+    private final K start;
+    private final K end;
+
+    private KeyRange(K start, K end)
+    {
+        Preconditions.checkArgument(start.compareTo(end) < 0);
+        this.start = start;
+        this.end = end;
+    }
+
+    public final K start()
+    {
+        return start;
+    }
+
+    public final K end()
+    {
+        return end;
+    }
+
+    public abstract boolean startInclusive();
+
+    public abstract boolean endInclusive();
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        KeyRange<?> that = (KeyRange<?>) o;
+        return Objects.equals(start, that.start) && Objects.equals(end, that.end);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(start, end);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "Range[" + start + ", " + end + ']';
+    }
+
+    /**
+     * Returns a negative integer, zero, or a positive integer as the provided key is less than, contained by,
+     * or greater than this range.
+     */
+    public abstract int compareKey(K key);
+
+    public boolean containsKey(K key)
+    {
+        return compareKey(key) == 0;
+    }
+
+    /**
+     * returns the index of the first key larger than what's covered by this range
+     */
+    public int higherKeyIndex(Keys keys, int lowerBound, int upperBound)
+    {
+        int i = keys.search(lowerBound, upperBound, this,
+                            (k, r) -> ((KeyRange) r).compareKey((Key) k) <= 0 ? -1 : 1);
+        if (i < 0) i = -1 - i;
+        return i;
+    }
+
+    public int higherKeyIndex(Keys keys)
+    {
+        return higherKeyIndex(keys, 0, keys.size());
+    }
+
+    /**
+     * returns the index of the lowest key contained in this range
+     * @param keys
+     */
+    public int lowKeyIndex(Keys keys)
+    {
+        if (keys.isEmpty()) return -1;
+
+        int i = keys.search(0, keys.size(), this,
+                            (k, r) -> ((KeyRange) r).compareKey((Key) k) < 0 ? -1 : 1);
+
+        if (i < 0) i = -1 - i;
+
+        if (i == 0 && !containsKey((K) keys.get(0))) i = -1;
+
+        return i;
+    }
+}
diff --git a/accord-core/src/main/java/accord/api/Read.java b/accord-core/src/main/java/accord/api/Read.java
index ff4fd2f..2149418 100644
--- a/accord-core/src/main/java/accord/api/Read.java
+++ b/accord-core/src/main/java/accord/api/Read.java
@@ -7,5 +7,5 @@ package accord.api;
  */
 public interface Read
 {
-    Data read(Key start, Key end, Store store);
+    Data read(KeyRange range, Store store);
 }
diff --git a/accord-core/src/main/java/accord/api/Write.java b/accord-core/src/main/java/accord/api/Write.java
index 4404ee6..eba020c 100644
--- a/accord-core/src/main/java/accord/api/Write.java
+++ b/accord-core/src/main/java/accord/api/Write.java
@@ -9,5 +9,5 @@ import accord.txn.Timestamp;
  */
 public interface Write
 {
-    void apply(Key start, Key end, Timestamp executeAt, Store store);
+    void apply(KeyRange range, Timestamp executeAt, Store store);
 }
diff --git a/accord-core/src/main/java/accord/topology/KeyRanges.java b/accord-core/src/main/java/accord/topology/KeyRanges.java
new file mode 100644
index 0000000..90ed527
--- /dev/null
+++ b/accord-core/src/main/java/accord/topology/KeyRanges.java
@@ -0,0 +1,48 @@
+package accord.topology;
+
+import accord.api.Key;
+import accord.api.KeyRange;
+
+import java.util.Arrays;
+
+public class KeyRanges
+{
+    public static final KeyRanges EMPTY = new KeyRanges(new KeyRange[0]);
+
+    private final KeyRange[] ranges;
+
+    public KeyRanges(KeyRange[] ranges)
+    {
+        this.ranges = ranges;
+    }
+
+    @Override
+    public String toString()
+    {
+        return Arrays.toString(ranges);
+    }
+
+    public int rangeIndexForKey(int lowerBound, int upperBound, Key key)
+    {
+        return Arrays.binarySearch(ranges, lowerBound, upperBound, key,
+                                   (r, k) -> -((KeyRange) r).compareKey((Key) k));
+    }
+
+    public int rangeIndexForKey(Key key)
+    {
+        return rangeIndexForKey(0, ranges.length, key);
+    }
+
+    public int size()
+    {
+        return ranges.length;
+    }
+
+    public KeyRanges select(int[] indexes)
+    {
+        KeyRange[] selection = new KeyRange[indexes.length];
+        for (int i=0; i<indexes.length; i++)
+            selection[i] = ranges[i];
+        return new KeyRanges(selection);
+    }
+}
diff --git a/accord-core/src/main/java/accord/topology/Shard.java b/accord-core/src/main/java/accord/topology/Shard.java
index 925308c..fcd7ee2 100644
--- a/accord-core/src/main/java/accord/topology/Shard.java
+++ b/accord-core/src/main/java/accord/topology/Shard.java
@@ -3,6 +3,7 @@ package accord.topology;
 import java.util.List;
 import java.util.Set;
 
+import accord.api.KeyRange;
 import accord.local.Node.Id;
 import accord.api.Key;
 import com.google.common.annotations.VisibleForTesting;
@@ -10,17 +11,16 @@ import com.google.common.base.Preconditions;
 
 public class Shard
 {
-    public final Key start, end;
+    public final KeyRange range;
     public final List<Id> nodes;
     public final Set<Id> fastPathElectorate;
     public final int recoveryFastPathSize;
     public final int fastPathQuorumSize;
     public final int slowPathQuorumSize;
 
-    public Shard(Key start, Key end, List<Id> nodes, Set<Id> fastPathElectorate)
+    public Shard(KeyRange range, List<Id> nodes, Set<Id> fastPathElectorate)
     {
-        this.start = start;
-        this.end = end;
+        this.range = range;
         this.nodes = nodes;
         int f = maxToleratedFailures(nodes.size());
         this.fastPathElectorate = fastPathElectorate;
@@ -40,18 +40,17 @@ public class Shard
     static int fastPathQuorumSize(int replicas, int electorate, int f)
     {
         Preconditions.checkArgument(electorate >= replicas - f);
-//        return (fastPathElectorateSize + f + 1 + 1) / 2;
         return (f + electorate)/2 + 1;
     }
 
     public boolean contains(Key key)
     {
-        return key.compareTo(start) >= 0 && key.compareTo(end) < 0;
+        return range.containsKey(key);
     }
 
     @Override
     public String toString()
     {
-        return "Shard[" + start + ',' + end + ']';
+        return "Shard[" + range.start() + ',' + range.end() + ']';
     }
 }
diff --git a/accord-core/src/main/java/accord/topology/Shards.java b/accord-core/src/main/java/accord/topology/Shards.java
index 6c5eadf..4c46f1b 100644
--- a/accord-core/src/main/java/accord/topology/Shards.java
+++ b/accord-core/src/main/java/accord/topology/Shards.java
@@ -3,20 +3,30 @@ package accord.topology;
 import java.util.Collections;
 import java.util.Map;
 
+import accord.api.KeyRange;
 import accord.local.Node.Id;
 import accord.txn.Keys;
 
 public class Shards extends Topology
 {
-    public static final Shards EMPTY = new Shards(Keys.EMPTY, new Shard[0], Collections.emptyMap(), Keys.EMPTY, new int[0]);
+    public static final Shards EMPTY = new Shards(new Shard[0], KeyRanges.EMPTY, Collections.emptyMap(), KeyRanges.EMPTY.EMPTY, new int[0]);
 
     public Shards(Shard... shards)
     {
         super(shards);
     }
 
-    public Shards(Keys starts, Shard[] shards, Map<Id, NodeInfo> nodeLookup, Keys subsetOfStarts, int[] supersetIndexes)
+    public Shards(Shard[] shards, KeyRanges ranges, Map<Id, NodeInfo> nodeLookup, KeyRanges subsetOfRanges, int[] supersetIndexes)
     {
-        super(starts, shards, nodeLookup, subsetOfStarts, supersetIndexes);
+        super(shards, ranges, nodeLookup, subsetOfRanges, supersetIndexes);
+    }
+
+    public static Shards select(Shard[] shards, int[] indexes)
+    {
+        Shard[] subset = new Shard[indexes.length];
+        for (int i=0; i<indexes.length; i++)
+            subset[i] = shards[indexes[i]];
+
+        return new Shards(subset);
     }
 }
diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java
index c3308b7..f568307 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -8,9 +8,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.function.IntFunction;
-import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import accord.api.KeyRange;
 import accord.local.Node.Id;
 import accord.api.Key;
 import accord.txn.Keys;
@@ -18,36 +18,35 @@ import accord.utils.IndexedConsumer;
 
 public class Topology extends AbstractCollection<Shard>
 {
-    // TODO: introduce range version of Keys
-    final Keys starts;
     final Shard[] shards;
+    final KeyRanges ranges;
     final Map<Id, Shards.NodeInfo> nodeLookup;
-    final Keys subsetOfStarts;
+    final KeyRanges subsetOfRanges;
     final int[] supersetIndexes;
 
     static class NodeInfo
     {
-        final Keys starts;
+        final KeyRanges ranges;
         final int[] supersetIndexes;
 
-        NodeInfo(Keys starts, int[] supersetIndexes)
+        NodeInfo(KeyRanges ranges, int[] supersetIndexes)
         {
-            this.starts = starts;
+            this.ranges = ranges;
             this.supersetIndexes = supersetIndexes;
         }
 
         @Override
         public String toString()
         {
-            return starts.toString();
+            return ranges.toString();
         }
     }
 
     public Topology(Shard... shards)
     {
-        this.starts = new Keys(Arrays.stream(shards).map(shard -> shard.start).sorted().collect(Collectors.toList()));
+        this.ranges = new KeyRanges(Arrays.stream(shards).map(shard -> shard.range).toArray(KeyRange[]::new));
         this.shards = shards;
-        this.subsetOfStarts = starts;
+        this.subsetOfRanges = ranges;
         this.supersetIndexes = IntStream.range(0, shards.length).toArray();
         this.nodeLookup = new HashMap<>();
         Map<Id, List<Integer>> build = new HashMap<>();
@@ -59,17 +58,17 @@ public class Topology extends AbstractCollection<Shard>
         for (Map.Entry<Id, List<Integer>> e : build.entrySet())
         {
             int[] supersetIndexes = e.getValue().stream().mapToInt(i -> i).toArray();
-            Keys starts = this.starts.select(supersetIndexes);
-            nodeLookup.put(e.getKey(), new Shards.NodeInfo(starts, supersetIndexes));
+            KeyRanges ranges = this.ranges.select(supersetIndexes);
+            nodeLookup.put(e.getKey(), new Shards.NodeInfo(ranges, supersetIndexes));
         }
     }
 
-    public Topology(Keys starts, Shard[] shards, Map<Id, Shards.NodeInfo> nodeLookup, Keys subsetOfStarts, int[] supersetIndexes)
+    public Topology(Shard[] shards, KeyRanges ranges, Map<Id, Shards.NodeInfo> nodeLookup, KeyRanges subsetOfRanges, int[] supersetIndexes)
     {
-        this.starts = starts;
         this.shards = shards;
+        this.ranges = ranges;
         this.nodeLookup = nodeLookup;
-        this.subsetOfStarts = subsetOfStarts;
+        this.subsetOfRanges = subsetOfRanges;
         this.supersetIndexes = supersetIndexes;
     }
 
@@ -78,12 +77,14 @@ public class Topology extends AbstractCollection<Shard>
         NodeInfo info = nodeLookup.get(node);
         if (info == null)
             return Shards.EMPTY;
-        return forKeys(info.starts);
+        return Shards.select(shards, info.supersetIndexes);
     }
 
     public Shard forKey(Key key)
     {
-        int i = starts.floorIndex(key);
+        int i = ranges.rangeIndexForKey(key);
+        if (i < 0 || i >= ranges.size())
+            throw new IllegalArgumentException("Range not found for " + key);
         return shards[i];
     }
 
@@ -91,19 +92,23 @@ public class Topology extends AbstractCollection<Shard>
     {
         int subsetIndex = 0;
         int count = 0;
-        int[] newSubset = new int[Math.min(select.size(), subsetOfStarts.size())];
+        int[] newSubset = new int[Math.min(select.size(), subsetOfRanges.size())];
         for (int i = 0 ; i < select.size() ; )
         {
-            subsetIndex = subsetOfStarts.floorIndex(subsetIndex, subsetOfStarts.size(), select.get(i));
+            // find the range containing the key at i
+            subsetIndex = subsetOfRanges.rangeIndexForKey(subsetIndex, subsetOfRanges.size(), select.get(i));
+            if (subsetIndex < 0 || subsetIndex >= subsetOfRanges.size())
+                throw new IllegalArgumentException("Range not found for " + select.get(i));
             int supersetIndex = supersetIndexes[subsetIndex];
             newSubset[count++] = supersetIndex;
             Shard shard = shards[supersetIndex];
-            i = select.ceilIndex(i, select.size(), shard.end);
+            // find the first key outside this range
+            i = shard.range.higherKeyIndex(select, i, select.size());
         }
         if (count != newSubset.length)
             newSubset = Arrays.copyOf(newSubset, count);
-        Keys subsetOfKeys = starts.select(newSubset);
-        return new Shards(starts, shards, nodeLookup, subsetOfKeys, newSubset);
+        KeyRanges rangeSubset = ranges.select(newSubset);
+        return new Shards(shards, ranges, nodeLookup, rangeSubset, newSubset);
     }
 
     /**
@@ -113,21 +118,6 @@ public class Topology extends AbstractCollection<Shard>
     public void forEachOn(Id on, Keys select, IndexedConsumer<Shard> consumer)
     {
         Shards.NodeInfo info = nodeLookup.get(on);
-//        int nodeIndex = 0;
-//        int subsetIndex = 0;
-//        for (int i = select.ceilIndex(info.starts.get(0)) ; i < select.size() ; )
-//        {
-//            nodeIndex = info.starts.floorIndex(nodeIndex, info.starts.size(), select.get(i));
-//            int supersetIndex = info.supersetIndexes[nodeIndex];
-//            Shard shard = shards[supersetIndex];
-//            if (shard.end.compareTo(select.get(i)) > 0)
-//            {
-//                subsetIndex = Arrays.binarySearch(supersetIndexes, subsetIndex, supersetIndexes.length, supersetIndex);
-//                consumer.accept(subsetIndex, shard);
-//            }
-//            i = select.ceilIndex(i + 1, select.size(), shard.end);
-//        }
-
         for (int i = 0, j = 0, k = 0 ; i < select.size() && j < supersetIndexes.length && k < info.supersetIndexes.length ;)
         {
             Key key = select.get(i);
@@ -135,9 +125,13 @@ public class Topology extends AbstractCollection<Shard>
             int c = supersetIndexes[j] - info.supersetIndexes[k];
             if (c < 0) ++j;
             else if (c > 0) ++k;
-            else if (key.compareTo(shard.start) < 0) ++i;
-            else if (key.compareTo(shard.end) < 0) { consumer.accept(j, shard); i++; j++; k++; }
-            else { j++; k++; }
+            else
+            {
+                int rcmp = shard.range.compareKey(key);
+                if (rcmp < 0) ++i;
+                else if (rcmp == 0) { consumer.accept(j, shard); i++; j++; k++; }
+                else { j++; k++; }
+            }
         }
     }
 
@@ -173,53 +167,17 @@ public class Topology extends AbstractCollection<Shard>
             consumer.accept(i, shards[supersetIndexes[i]]);
     }
 
-
     public <T> T[] select(Keys select, T[] indexedByShard, IntFunction<T[]> constructor)
     {
         List<T> selection = new ArrayList<>();
-//        int subsetIndex = 0;
-//        for (int i = select.ceilIndex(shards[supersetIndexes[0]].start) ; i < select.size() ; )
-//        {
-//            subsetIndex = subsetOfStarts.floorIndex(subsetIndex, subsetOfStarts.size(), select.get(i));
-//            selection.add(indexedByShard[subsetIndex]);
-//            Shard shard = shards[supersetIndexes[subsetIndex]];
-//            i = select.ceilIndex(i + 1, select.size(), shard.end);
-//        }
-
-//        int minSubsetIndex = 0;
-//        for (int i = select.ceilIndex(shards[supersetIndexes[0]].start) ; i < select.size() ; )
-//        {
-//            int subsetIndex = subsetOfStarts.floorIndex(minSubsetIndex, subsetOfStarts.size(), select.get(i));
-//            selection.add(indexedByShard[subsetIndex]);
-//            minSubsetIndex = subsetIndex + 1;
-//            if (minSubsetIndex == supersetIndexes.length)
-//                break;
-//            Shard shard = shards[supersetIndexes[minSubsetIndex]];
-//            i = select.ceilIndex(i + 1, select.size(), shard.start);
-//        }
-
-//        int minSubsetIndex = 0;
-//        for (int i = select.ceilIndex(shards[supersetIndexes[0]].start) ; i < select.size() ; )
-//        {
-//            int subsetIndex = subsetOfStarts.floorIndex(minSubsetIndex, subsetOfStarts.size(), select.get(i));
-//            Shard shard = shards[supersetIndexes[subsetIndex]];
-//            if (shard.end.compareTo(select.get(i)) > 0)
-//                selection.add(indexedByShard[subsetIndex]);
-//            minSubsetIndex = subsetIndex + 1;
-//            if (minSubsetIndex == supersetIndexes.length)
-//                break;
-//
-//            shard = shards[supersetIndexes[minSubsetIndex]];
-//            i = select.ceilIndex(i + 1, select.size(), shard.start);
-//        }
-
         for (int i = 0, j = 0 ; i < select.size() && j < supersetIndexes.length ;)
         {
             Key k = select.get(i);
             Shard shard = shards[supersetIndexes[j]];
-            int c = k.compareTo(shard.start);
+
+            int c = shard.range.compareKey(k);
             if (c < 0) ++i;
-            else if (k.compareTo(shard.end) < 0) { selection.add(indexedByShard[j++]); i++; }
+            else if (c == 0) { selection.add(indexedByShard[j++]); i++; }
             else j++;
         }
 
@@ -235,7 +193,7 @@ public class Topology extends AbstractCollection<Shard>
     @Override
     public int size()
     {
-        return subsetOfStarts.size();
+        return subsetOfRanges.size();
     }
 
     public Shard get(int index)
diff --git a/accord-core/src/main/java/accord/txn/Keys.java b/accord-core/src/main/java/accord/txn/Keys.java
index 4e49a8c..f72d419 100644
--- a/accord-core/src/main/java/accord/txn/Keys.java
+++ b/accord-core/src/main/java/accord/txn/Keys.java
@@ -1,9 +1,6 @@
 package accord.txn;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.SortedSet;
+import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -43,17 +40,6 @@ public class Keys implements Iterable<Key>
         return keys[indexOf];
     }
 
-    public Stream<Key> subSet(Key start, boolean isInclusiveStart, Key end, boolean isInclusiveEnd)
-    {
-        int i = Arrays.binarySearch(keys, start);
-        if (i < 0) i = -1 -i;
-        else if (!isInclusiveStart) ++i;
-        int j = Arrays.binarySearch(keys, end);
-        if (j < 0) j = -1 -j;
-        else if (isInclusiveEnd) ++j;
-        return Arrays.stream(keys, i, j);
-    }
-
     public Keys select(int[] indexes)
     {
         Key[] selection = new Key[indexes.length];
@@ -62,59 +48,31 @@ public class Keys implements Iterable<Key>
         return new Keys(selection);
     }
 
-    public int size()
+    public boolean isEmpty()
     {
-        return keys.length;
+        return keys.length == 0;
     }
 
-    public int ceilIndex(int lowerBound, int upperBound, Key key)
+    public int size()
     {
-        int i = Arrays.binarySearch(keys, lowerBound, upperBound, key);
-        if (i < 0) i = -1 - i;
-        return i;
+        return keys.length;
     }
 
-    public int ceilIndex(Key key)
+    public int search(int lowerBound, int upperBound, Object key, Comparator<Object> comparator)
     {
-        return ceilIndex(0, keys.length, key);
+        return Arrays.binarySearch(keys, lowerBound, upperBound, key, comparator);
     }
 
-    public int higherIndex(int lowerBound, int upperBound, Key key)
+    public int ceilIndex(int lowerBound, int upperBound, Key key)
     {
         int i = Arrays.binarySearch(keys, lowerBound, upperBound, key);
         if (i < 0) i = -1 - i;
-        else ++i;
-        return i;
-    }
-
-    public int higherIndex(Key key)
-    {
-        return higherIndex(0, keys.length, key);
-    }
-
-    public int floorIndex(int lowerBound, int upperBound, Key key)
-    {
-        int i = Arrays.binarySearch(keys, lowerBound, upperBound, key);
-        if (i < 0) i = -2 - i;
-        return i;
-    }
-
-    public int floorIndex(Key key)
-    {
-        return floorIndex(0, keys.length, key);
-    }
-
-    public int lowerIndex(int lowerBound, int upperBound, Key key)
-    {
-        int i = Arrays.binarySearch(keys, lowerBound, upperBound, key);
-        if (i < 0) i = -2 - i;
-        else --i;
         return i;
     }
 
-    public int lowerIndex(Key key)
+    public int ceilIndex(Key key)
     {
-        return lowerIndex(0, keys.length, key);
+        return ceilIndex(0, keys.length, key);
     }
 
     public Stream<Key> stream()
diff --git a/accord-core/src/main/java/accord/txn/Txn.java b/accord-core/src/main/java/accord/txn/Txn.java
index f41daeb..1249d97 100644
--- a/accord-core/src/main/java/accord/txn/Txn.java
+++ b/accord-core/src/main/java/accord/txn/Txn.java
@@ -3,13 +3,7 @@ package accord.txn;
 import java.util.Comparator;
 import java.util.stream.Stream;
 
-import accord.api.Data;
-import accord.api.Query;
-import accord.api.Read;
-import accord.api.Result;
-import accord.api.Update;
-import accord.api.Key;
-import accord.api.Store;
+import accord.api.*;
 import accord.local.Command;
 import accord.local.CommandsForKey;
 import accord.local.Instance;
@@ -80,15 +74,15 @@ public class Txn
         return "read:" + read.toString() + (update != null ? ", update:" + update : "");
     }
 
-    public Data read(Key start, Key end, Store store)
+    public Data read(KeyRange range, Store store)
     {
-        return read.read(start, end, store);
+        return read.read(range, store);
     }
 
     public Data read(Command command)
     {
         Instance instance = command.instance;
-        return read(instance.shard.start, instance.shard.end, instance.store());
+        return read(instance.shard.range, instance.store());
     }
 
     // TODO: move these somewhere else?
diff --git a/accord-core/src/main/java/accord/txn/Writes.java b/accord-core/src/main/java/accord/txn/Writes.java
index d17b8e6..4913e77 100644
--- a/accord-core/src/main/java/accord/txn/Writes.java
+++ b/accord-core/src/main/java/accord/txn/Writes.java
@@ -19,7 +19,7 @@ public class Writes
     public void apply(Instance instance)
     {
         if (write != null)
-            write.apply(instance.shard.start, instance.shard.end, executeAt, instance.store());
+            write.apply(instance.shard.range, executeAt, instance.store());
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/utils/KeyRange.java b/accord-core/src/main/java/accord/utils/KeyRange.java
deleted file mode 100644
index 6ade671..0000000
--- a/accord-core/src/main/java/accord/utils/KeyRange.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package accord.utils;
-
-import accord.api.Key;
-
-public class KeyRange<K extends Key<K>>
-{
-    public final K start;
-    public final K end;
-
-    private KeyRange(K start, K end)
-    {
-        this.start = start;
-        this.end = end;
-    }
-
-    public static <K extends Key<K>> KeyRange<K> of(K start, K end)
-    {
-        return new KeyRange<>(start, end);
-    }
-}
diff --git a/accord-core/src/test/java/accord/impl/IntHashKey.java b/accord-core/src/test/java/accord/impl/IntHashKey.java
index 79460e1..7d1c696 100644
--- a/accord-core/src/test/java/accord/impl/IntHashKey.java
+++ b/accord-core/src/test/java/accord/impl/IntHashKey.java
@@ -5,11 +5,19 @@ import java.util.List;
 import java.util.zip.CRC32C;
 
 import accord.api.Key;
-import accord.utils.KeyRange;
+import accord.api.KeyRange;
 import accord.txn.Keys;
 
 public class IntHashKey implements Key<IntHashKey>
 {
+    private static class Range extends KeyRange.EndInclusive<IntHashKey>
+    {
+        public Range(IntHashKey start, IntHashKey end)
+        {
+            super(start, end);
+        }
+    }
+
     public final int key;
     public final int hash;
 
@@ -47,11 +55,6 @@ public class IntHashKey implements Key<IntHashKey>
         return new Keys(keys);
     }
 
-    public static KeyRange<IntHashKey> range(int start, int end)
-    {
-        return KeyRange.of(key(start), key(end));
-    }
-
     public static KeyRange<IntHashKey>[] ranges(int count)
     {
         List<KeyRange<IntHashKey>> result = new ArrayList<>();
@@ -61,10 +64,10 @@ public class IntHashKey implements Key<IntHashKey>
         for (int i = 1 ; i < count ; ++i)
         {
             IntHashKey next = new IntHashKey(Integer.MIN_VALUE, (int)Math.min(Integer.MAX_VALUE, start + i * delta));
-            result.add(KeyRange.of(prev, next));
+            result.add(new Range(prev, next));
             prev = next;
         }
-        result.add(KeyRange.of(prev, new IntHashKey(Integer.MIN_VALUE, Integer.MAX_VALUE)));
+        result.add(new Range(prev, new IntHashKey(Integer.MIN_VALUE, Integer.MAX_VALUE)));
         return result.toArray(KeyRange[]::new);
     }
 
diff --git a/accord-core/src/test/java/accord/impl/IntKey.java b/accord-core/src/test/java/accord/impl/IntKey.java
index c24472b..eb57364 100644
--- a/accord-core/src/test/java/accord/impl/IntKey.java
+++ b/accord-core/src/test/java/accord/impl/IntKey.java
@@ -2,16 +2,25 @@ package accord.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 import accord.api.Key;
-import accord.utils.KeyRange;
+import accord.api.KeyRange;
 import accord.txn.Keys;
 
 public class IntKey implements Key<IntKey>
 {
+    private static class Range extends KeyRange.EndInclusive<IntKey>
+    {
+        public Range(IntKey start, IntKey end)
+        {
+            super(start, end);
+        }
+    }
+
     public final int key;
 
-    private IntKey(int key)
+    public IntKey(int key)
     {
         this.key = key;
     }
@@ -30,16 +39,21 @@ public class IntKey implements Key<IntKey>
     public static Keys keys(int k0, int... kn)
     {
         Key[] keys = new Key[kn.length + 1];
-        keys[0] = key(k0);
+        keys[0] = new IntKey(k0);
         for (int i=0; i<kn.length; i++)
-            keys[i + 1] = key(kn[i]);
+            keys[i + 1] = new IntKey(kn[i]);
 
         return new Keys(keys);
     }
 
+    public static KeyRange<IntKey> range(IntKey start, IntKey end)
+    {
+        return new Range(start, end);
+    }
+
     public static KeyRange<IntKey> range(int start, int end)
     {
-        return KeyRange.of(key(start), key(end));
+        return range(key(start), key(end));
     }
 
     public static KeyRange<IntKey>[] ranges(int count)
@@ -51,10 +65,10 @@ public class IntKey implements Key<IntKey>
         for (int i = 1 ; i < count ; ++i)
         {
             IntKey next = new IntKey((int)Math.min(Integer.MAX_VALUE, start + i * delta));
-            result.add(KeyRange.of(prev, next));
+            result.add(new Range(prev, next));
             prev = next;
         }
-        result.add(KeyRange.of(prev, IntKey.key(Integer.MAX_VALUE)));
+        result.add(new Range(prev, new IntKey(Integer.MAX_VALUE)));
         return result.toArray(KeyRange[]::new);
     }
 
@@ -63,4 +77,19 @@ public class IntKey implements Key<IntKey>
     {
         return Integer.toString(key);
     }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        IntKey intKey = (IntKey) o;
+        return key == intKey.key;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(key);
+    }
 }
diff --git a/accord-core/src/test/java/accord/impl/TopologyFactory.java b/accord-core/src/test/java/accord/impl/TopologyFactory.java
index a8dad3b..ccdde7e 100644
--- a/accord-core/src/test/java/accord/impl/TopologyFactory.java
+++ b/accord-core/src/test/java/accord/impl/TopologyFactory.java
@@ -4,7 +4,7 @@ package accord.impl;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.Key;
-import accord.utils.KeyRange;
+import accord.api.KeyRange;
 import accord.topology.Shard;
 import accord.topology.Shards;
 import accord.utils.WrapAroundList;
@@ -42,7 +42,7 @@ public class TopologyFactory<K extends Key<K>>
 
         final List<Shard> shards = new ArrayList<>();
         for (int i = 0 ; i < ranges.length ; ++i)
-            shards.add(new Shard(ranges[i].start, ranges[i].end, electorates.get(i % electorates.size()), fastPathElectorates.get(i % fastPathElectorates.size())));
+            shards.add(new Shard(ranges[i], electorates.get(i % electorates.size()), fastPathElectorates.get(i % fastPathElectorates.size())));
         return new Shards(shards.toArray(Shard[]::new));
     }
 
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java b/accord-core/src/test/java/accord/impl/list/ListRead.java
index 3bed2dd..f70ffeb 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -1,11 +1,10 @@
 package accord.impl.list;
 
-import accord.api.Data;
-import accord.api.Key;
-import accord.api.Store;
-import accord.api.Read;
+import accord.api.*;
 import accord.txn.Keys;
 
+import static java.lang.Math.max;
+
 public class ListRead implements Read
 {
     public final Keys keys;
@@ -16,11 +15,14 @@ public class ListRead implements Read
     }
 
     @Override
-    public Data read(Key start, Key end, Store store)
+    public Data read(KeyRange range, Store store)
     {
         ListStore s = (ListStore)store;
         ListData result = new ListData();
-        for (int i = keys.ceilIndex(start), limit = keys.ceilIndex(end) ; i < limit ; ++i)
+        int lowIdx = range.lowKeyIndex(keys);
+        if (lowIdx < 0)
+            return result;
+        for (int i = lowIdx, limit = range.higherKeyIndex(keys) ; i < limit ; ++i)
             result.put(keys.get(i), s.get(keys.get(i)));
         return result;
     }
diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java b/accord-core/src/test/java/accord/impl/list/ListWrite.java
index bc02241..20694f1 100644
--- a/accord-core/src/test/java/accord/impl/list/ListWrite.java
+++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java
@@ -1,9 +1,11 @@
 package accord.impl.list;
 
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.TreeMap;
 
 import accord.api.Key;
+import accord.api.KeyRange;
 import accord.api.Store;
 import accord.api.Write;
 import accord.txn.Timestamp;
@@ -12,10 +14,12 @@ import accord.utils.Timestamped;
 public class ListWrite extends TreeMap<Key, int[]> implements Write
 {
     @Override
-    public void apply(Key start, Key end, Timestamp executeAt, Store store)
+    public void apply(KeyRange range, Timestamp executeAt, Store store)
     {
         ListStore s = (ListStore) store;
-        for (Map.Entry<Key, int[]> e : subMap(start, true, end, false).entrySet())
+        NavigableMap<Key, int[]> selection = subMap(range.start(), range.startInclusive(),
+                                                    range.end(), range.endInclusive());
+        for (Map.Entry<Key, int[]> e : selection.entrySet())
             s.data.merge(e.getKey(), new Timestamped<>(executeAt, e.getValue()), Timestamped::merge);
     }
 }
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index d407531..8580448 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -3,10 +3,9 @@ package accord.impl.mock;
 import accord.NetworkFilter;
 import accord.local.Node;
 import accord.local.Node.Id;
-import accord.messages.*;
+import accord.messages.Timeout;
 import accord.utils.ThreadPoolScheduler;
 import accord.txn.TxnId;
-import accord.utils.KeyRange;
 import accord.messages.Callback;
 import accord.messages.Reply;
 import accord.messages.Request;
@@ -85,7 +84,7 @@ public class MockCluster implements Network
             Id nextId = nextNodeId();
             ids.add(nextId);
         }
-        TopologyFactory<IntKey> topologyFactory = new TopologyFactory<>(config.replication, KeyRange.of(IntKey.key(0), IntKey.key(config.maxKey)));
+        TopologyFactory<IntKey> topologyFactory = new TopologyFactory<>(config.replication, IntKey.range(0, config.maxKey));
         Shards topology = topologyFactory.toShards(ids);
         for (int i=0; i<config.initialNodes; i++)
         {
diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java b/accord-core/src/test/java/accord/impl/mock/MockStore.java
index 3a27958..36b2271 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockStore.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java
@@ -19,8 +19,8 @@ public class MockStore implements Store
     };
 
     public static final Result RESULT = new Result() {};
-    public static final Read READ = (start, end, store) -> DATA;
+    public static final Read READ = (range, store) -> DATA;
     public static final Query QUERY = data -> RESULT;
-    public static final Write WRITE = (start, end, executeAt, store) -> {};
+    public static final Write WRITE = (range, executeAt, store) -> {};
     public static final Update UPDATE = data -> WRITE;
 }
diff --git a/accord-core/src/test/java/accord/topology/TopologyTest.java b/accord-core/src/test/java/accord/topology/TopologyTest.java
new file mode 100644
index 0000000..76a1fa9
--- /dev/null
+++ b/accord-core/src/test/java/accord/topology/TopologyTest.java
@@ -0,0 +1,77 @@
+package accord.topology;
+
+import accord.Utils;
+import accord.api.Key;
+import accord.api.KeyRange;
+import accord.impl.IntKey;
+import accord.impl.TopologyFactory;
+import accord.local.Node;
+import accord.txn.Keys;
+import com.google.common.collect.Iterables;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static accord.impl.IntKey.key;
+import static accord.impl.IntKey.range;
+
+public class TopologyTest
+{
+
+    private static void assertRangeForKey(Topology topology, int key, int start, int end)
+    {
+        Key expectedKey = key(key);
+        Shard shard = topology.forKey(key(key));
+        KeyRange expectedRange = range(start, end);
+        Assertions.assertTrue(expectedRange.containsKey(expectedKey));
+        Assertions.assertTrue(shard.range.containsKey(expectedKey));
+        Assertions.assertEquals(expectedRange, shard.range);
+
+        Shards shards = topology.forKeys(Keys.of(expectedKey));
+        shard = Iterables.getOnlyElement(shards);
+        Assertions.assertTrue(shard.range.containsKey(expectedKey));
+        Assertions.assertEquals(expectedRange, shard.range);
+    }
+
+    private static Topology topology(List<Node.Id> ids, int rf, KeyRange... ranges)
+    {
+        TopologyFactory<IntKey> topologyFactory = new TopologyFactory<>(rf, ranges);
+        return topologyFactory.toShards(ids);
+    }
+
+    private static Topology topology(int numNodes, int rf, KeyRange... ranges)
+    {
+        return topology(Utils.ids(numNodes), rf, ranges);
+    }
+
+    private static Topology topology(KeyRange... ranges)
+    {
+        return topology(1, 1, ranges);
+    }
+
+    private static KeyRange<IntKey> r(int start, int end)
+    {
+        return IntKey.range(start, end);
+    }
+
+    @Test
+    void forKeyTest()
+    {
+        Topology topology = topology(r(0, 100), r(100, 200), r(200, 300));
+        assertRangeForKey(topology, 50, 0, 100);
+        assertRangeForKey(topology, 100, 0, 100);
+    }
+
+    @Test
+    void forRangesTest()
+    {
+
+    }
+
+    @Test
+    void sequentialRanges()
+    {
+        // TODO: confirm non-sequential ranges are handled properly
+    }
+}
diff --git a/accord-core/src/test/java/accord/utils/KeyRangeTest.java b/accord-core/src/test/java/accord/utils/KeyRangeTest.java
new file mode 100644
index 0000000..b6ce4a4
--- /dev/null
+++ b/accord-core/src/test/java/accord/utils/KeyRangeTest.java
@@ -0,0 +1,150 @@
+package accord.utils;
+
+import accord.api.Key;
+import accord.api.KeyRange;
+import accord.impl.IntKey;
+import accord.txn.Keys;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class KeyRangeTest
+{
+    static IntKey k(int v)
+    {
+        return new IntKey(v);
+    }
+
+    static KeyRange<IntKey> rangeEndIncl(int start, int end)
+    {
+        return new KeyRange.EndInclusive<>(k(start), k(end)) {};
+    }
+
+    static KeyRange<IntKey> rangeStartIncl(int start, int end)
+    {
+        return new KeyRange.StartInclusive<>(k(start), k(end)) {};
+    }
+
+    static Keys keys(int... values)
+    {
+        Key[] keys = new Key[values.length];
+        for (int i=0; i<values.length; i++)
+            keys[i] = IntKey.key(values[i]);
+        return new Keys(keys);
+    }
+
+    private static void assertInvalidKeyRange(int start, int end)
+    {
+        try
+        {
+            rangeStartIncl(start, end);
+            Assertions.fail("Expected IllegalArgumentException");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // expected
+        }
+
+        try
+        {
+            rangeEndIncl(start, end);
+            Assertions.fail("Expected IllegalArgumentException");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // expected
+        }
+    }
+
+    @Test
+    void invalidRangeTest()
+    {
+        assertInvalidKeyRange(1, 1);
+        assertInvalidKeyRange(2, 1);
+    }
+
+    @Test
+    void containsTest()
+    {
+        KeyRange<IntKey> endInclRange = rangeEndIncl(10, 20);
+        Assertions.assertFalse(endInclRange.containsKey(k(10)));
+        Assertions.assertFalse(endInclRange.startInclusive());
+        Assertions.assertTrue(endInclRange.containsKey(k(20)));
+        Assertions.assertTrue(endInclRange.endInclusive());
+
+        KeyRange<IntKey> startInclRange = rangeStartIncl(10, 20);
+        Assertions.assertTrue(startInclRange.containsKey(k(10)));
+        Assertions.assertTrue(startInclRange.startInclusive());
+        Assertions.assertFalse(startInclRange.containsKey(k(20)));
+        Assertions.assertFalse(startInclRange.endInclusive());
+    }
+
+    private static void assertHigherKeyIndex(int expectedIdx, KeyRange range, Keys keys)
+    {
+        if (expectedIdx > 0 && expectedIdx < keys.size())
+            Assertions.assertTrue(range.containsKey(keys.get(expectedIdx - 1)));
+        int actualIdx = range.higherKeyIndex(keys);
+        Assertions.assertEquals(expectedIdx, actualIdx);
+    }
+
+    @Test
+    void higherKeyIndexTest()
+    {
+        Keys keys = keys(10, 11, 12, 13, 14, 15, 16);
+        assertHigherKeyIndex(0, rangeEndIncl(0, 9), keys);
+        assertHigherKeyIndex(0, rangeStartIncl(0, 10), keys);
+        assertHigherKeyIndex(0, rangeEndIncl(0, 5), keys);
+        assertHigherKeyIndex(0, rangeStartIncl(0, 5), keys);
+
+        assertHigherKeyIndex(1, rangeEndIncl(9, 10), keys);
+        assertHigherKeyIndex(0, rangeStartIncl(9, 10), keys);
+        assertHigherKeyIndex(5, rangeEndIncl(11, 14), keys);
+        assertHigherKeyIndex(4, rangeStartIncl(11, 14), keys);
+        assertHigherKeyIndex(6, rangeEndIncl(11, 15), keys);
+        assertHigherKeyIndex(5, rangeStartIncl(11, 15), keys);
+
+        assertHigherKeyIndex(7, rangeEndIncl(16, 25), keys);
+        assertHigherKeyIndex(7, rangeStartIncl(16, 25), keys);
+        assertHigherKeyIndex(7, rangeEndIncl(20, 25), keys);
+        assertHigherKeyIndex(7, rangeStartIncl(20, 25), keys);
+    }
+
+    private static void assertLowKeyIndex(int expectedIdx, KeyRange range, Keys keys)
+    {
+        if (expectedIdx >= 0 && expectedIdx < keys.size())
+        {
+            Assertions.assertTrue(range.containsKey(keys.get(expectedIdx)));
+        }
+        else
+        {
+            Assertions.assertFalse(range.containsKey(keys.get(0)));
+            Assertions.assertFalse(range.containsKey(keys.get(keys.size() - 1)));
+        }
+
+        int actualIdx = range.lowKeyIndex(keys);
+        Assertions.assertEquals(expectedIdx, actualIdx);
+    }
+
+    @Test
+    void lowKeyIndexTest()
+    {
+        Keys keys = keys(10, 11, 12, 13, 14, 15, 16);
+        assertLowKeyIndex(-1, rangeEndIncl(0, 5), keys);
+        assertLowKeyIndex(-1, rangeStartIncl(0, 5), keys);
+        assertLowKeyIndex(-1, rangeEndIncl(0, 9), keys);
+        assertLowKeyIndex(-1, rangeStartIncl(0, 9), keys);
+
+        assertLowKeyIndex(0, rangeEndIncl(5, 10), keys);
+        assertLowKeyIndex(-1, rangeStartIncl(5, 10), keys);
+        assertLowKeyIndex(2, rangeEndIncl(11, 15), keys);
+        assertLowKeyIndex(1, rangeStartIncl(11, 15), keys);
+        assertLowKeyIndex(3, rangeEndIncl(12, 14), keys);
+        assertLowKeyIndex(2, rangeStartIncl(12, 14), keys);
+        assertLowKeyIndex(6, rangeEndIncl(15, 20), keys);
+        assertLowKeyIndex(5, rangeStartIncl(15, 20), keys);
+
+        assertLowKeyIndex(7, rangeEndIncl(16, 20), keys);
+        assertLowKeyIndex(6, rangeStartIncl(16, 20), keys);
+        assertLowKeyIndex(7, rangeEndIncl(20, 25), keys);
+        assertLowKeyIndex(7, rangeStartIncl(20, 25), keys);
+    }
+}
diff --git a/accord-core/src/test/java/accord/utils/KeyRangesTest.java b/accord-core/src/test/java/accord/utils/KeyRangesTest.java
new file mode 100644
index 0000000..4b3990e
--- /dev/null
+++ b/accord-core/src/test/java/accord/utils/KeyRangesTest.java
@@ -0,0 +1,31 @@
+package accord.utils;
+
+import accord.api.KeyRange;
+import accord.impl.IntKey;
+import accord.topology.KeyRanges;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class KeyRangesTest
+{
+    private static KeyRange<IntKey> r(int start, int end)
+    {
+        return IntKey.range(start, end);
+    }
+
+    private static KeyRanges ranges(KeyRange... ranges)
+    {
+        return new KeyRanges(ranges);
+    }
+
+    @Test
+    void rangeIndexForKeyTest()
+    {
+        KeyRanges ranges = ranges(r(100, 200), r(300, 400));
+        Assertions.assertEquals(-1, ranges.rangeIndexForKey(IntKey.key(50)));
+        Assertions.assertEquals(0, ranges.rangeIndexForKey(IntKey.key(150)));
+        Assertions.assertEquals(-2, ranges.rangeIndexForKey(IntKey.key(250)));
+        Assertions.assertEquals(1, ranges.rangeIndexForKey(IntKey.key(350)));
+        Assertions.assertEquals(-3, ranges.rangeIndexForKey(IntKey.key(450)));
+    }
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
index a16ccd2..208df48 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
@@ -3,12 +3,21 @@ package accord.maelstrom;
 import java.io.IOException;
 
 import accord.api.Key;
+import accord.api.KeyRange;
 import com.google.gson.TypeAdapter;
 import com.google.gson.stream.JsonReader;
 import com.google.gson.stream.JsonWriter;
 
 public class MaelstromKey extends Datum<MaelstromKey> implements Key<MaelstromKey>
 {
+    public static class Range extends KeyRange.EndInclusive<MaelstromKey>
+    {
+        public Range(MaelstromKey start, MaelstromKey end)
+        {
+            super(start, end);
+        }
+    }
+
     public MaelstromKey(Kind kind, Object value)
     {
         super(kind, value);
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
index 5a2630f..7773ba8 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
@@ -1,11 +1,10 @@
 package accord.maelstrom;
 
-import accord.api.Data;
-import accord.api.Key;
-import accord.api.Store;
-import accord.api.Read;
+import accord.api.*;
 import accord.txn.Keys;
 
+import static java.lang.Math.max;
+
 public class MaelstromRead implements Read
 {
     final Keys keys;
@@ -16,11 +15,14 @@ public class MaelstromRead implements Read
     }
 
     @Override
-    public Data read(Key start, Key end, Store store)
+    public Data read(KeyRange range, Store store)
     {
         MaelstromStore s = (MaelstromStore)store;
         MaelstromData result = new MaelstromData();
-        for (int i = keys.ceilIndex(start), limit = keys.ceilIndex(end) ; i < limit ; ++i)
+        int lowIdx = range.lowKeyIndex(keys);
+        if (lowIdx < 0)
+            return result;
+        for (int i = lowIdx, limit = range.higherKeyIndex(keys) ; i < limit ; ++i)
             result.put(keys.get(i), s.get(keys.get(i)));
         return result;
     }
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
index 5adbd9d..31ec395 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
@@ -1,9 +1,11 @@
 package accord.maelstrom;
 
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.TreeMap;
 
 import accord.api.Key;
+import accord.api.KeyRange;
 import accord.api.Store;
 import accord.api.Write;
 import accord.txn.Timestamp;
@@ -12,10 +14,12 @@ import accord.utils.Timestamped;
 public class MaelstromWrite extends TreeMap<Key, Value> implements Write
 {
     @Override
-    public void apply(Key start, Key end, Timestamp executeAt, Store store)
+    public void apply(KeyRange range, Timestamp executeAt, Store store)
     {
         MaelstromStore s = (MaelstromStore) store;
-        for (Map.Entry<Key, Value> e : subMap(start, true, end, false).entrySet())
+        NavigableMap<Key, Value> selection = subMap(range.start(), range.startInclusive(),
+                                                    range.end(), range.endInclusive());
+        for (Map.Entry<Key, Value> e : selection.entrySet())
             s.data.merge(e.getKey(), new Timestamped<>(executeAt, e.getValue()), Timestamped::merge);
     }
 }
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java b/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java
index bfa8e5e..f9dcdc1 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java
@@ -6,6 +6,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import accord.api.KeyRange;
 import accord.local.Node.Id;
 import accord.maelstrom.Datum.Kind;
 import accord.topology.Shard;
@@ -18,22 +19,24 @@ public class TopologyFactory
     final int shards;
     final int rf;
     final Kind[] kinds;
-    final MaelstromKey[][] starts, ends;
+    final KeyRange<MaelstromKey>[][] ranges;
 
     public TopologyFactory(int shards, int rf)
     {
         this.shards = shards;
         this.rf = rf;
         this.kinds = Datum.COMPARE_BY_HASH ? new Kind[] { Kind.HASH } : new Kind[] { Kind.STRING, Kind.LONG, Kind.DOUBLE };
-        this.starts = new MaelstromKey[kinds.length][shards];
-        this.ends = new MaelstromKey[kinds.length][shards];
+        this.ranges = new MaelstromKey.Range[kinds.length][shards];
         for (int i = 0 ; i < kinds.length ; ++i)
         {
             Kind kind = kinds[i];
-            starts[i] = kind.split(shards);
-            ends[i] = new MaelstromKey[shards];
-            System.arraycopy(starts[i], 1, ends[i], 0, shards - 1);
-            ends[i][shards - 1] = new MaelstromKey(kind, null);
+            MaelstromKey[] starts = kind.split(shards);
+            MaelstromKey[] ends = new MaelstromKey[shards];
+            System.arraycopy(starts, 1, ends, 0, shards - 1);
+            ends[shards - 1] = new MaelstromKey(kind, null);
+            this.ranges[i] = new MaelstromKey.Range[shards];
+            for (int j=0; j<shards; j++)
+                ranges[i][j] = new MaelstromKey.Range(starts[j], ends[j]);
         }
     }
 
@@ -59,7 +62,7 @@ public class TopologyFactory
         for (int j = 0 ; j < kinds.length ; ++j)
         {
             for (int i = 0 ; i < this.shards ; ++i)
-                shards.add(new Shard(starts[j][i], ends[j][i], electorates.get(i % electorates.size()), fastPathElectorates.get(i % fastPathElectorates.size())));
+                shards.add(new Shard(ranges[j][i], electorates.get(i % electorates.size()), fastPathElectorates.get(i % fastPathElectorates.size())));
         }
         return new Shards(shards.toArray(Shard[]::new));
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org