You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2022/09/20 15:24:29 UTC

[ignite-3] branch main updated: IGNITE-17318 Implement RocksDB based sorted index storage (#1076)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new dec967219a IGNITE-17318 Implement RocksDB based sorted index storage (#1076)
dec967219a is described below

commit dec967219a1a5a1c6a8de25586effd841a8eec73
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Tue Sep 20 18:24:23 2022 +0300

    IGNITE-17318 Implement RocksDB based sorted index storage (#1076)
---
 .../internal/binarytuple/BinaryTupleBuilder.java   |  76 ++--
 .../internal/binarytuple/BinaryTupleCommon.java    |  21 +
 .../internal/binarytuple/BinaryTupleParser.java    |   8 +-
 .../binarytuple/BinaryTuplePrefixBuilder.java      |  78 ++++
 .../internal/binarytuple/BinaryTupleTest.java      |  68 ++--
 .../handler/requests/table/ClientTableCommon.java  |   2 +-
 .../internal/client/table/ClientKeyValueView.java  |   2 +-
 .../client/table/ClientRecordSerializer.java       |   2 +-
 .../client/table/ClientTupleSerializer.java        |   4 +-
 .../raft/RocksDbClusterStateStorage.java           |   2 +-
 .../raft/AbstractClusterStateStorageTest.java      |   2 +-
 .../dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs     |   2 +-
 .../apache/ignite/internal/rocksdb/RocksUtils.java |  18 +-
 .../rocksdb/flush/RocksDbFlushListener.java        |   9 +-
 .../internal/rocksdb/flush/RocksDbFlusher.java     |  25 +-
 .../ignite/internal/schema/BinaryConverter.java    |   3 +-
 .../apache/ignite/internal/schema/BinaryRow.java   |   5 +
 .../ignite/internal/schema/BinaryTuplePrefix.java  |  71 ++++
 .../ignite/internal/schema/ByteBufferRow.java      |   5 +
 .../ignite/internal/schema/row/InternalTuple.java  |   6 +
 .../org/apache/ignite/internal/schema/row/Row.java |   5 +
 .../internal/schema/BinaryTuplePrefixTest.java     |  99 +++++
 .../storage/index}/BinaryTupleComparator.java      |  78 ++--
 .../storage/index/HashIndexDescriptor.java         |   1 -
 .../storage/index/SortedIndexDescriptor.java       |  70 +++-
 .../internal/storage/index/SortedIndexStorage.java |   6 +-
 .../index/AbstractSortedIndexStorageTest.java      |  41 +-
 .../storage/index/BinaryTupleComparatorTest.java   | 443 +++++++++++++++++++++
 .../storage/index/TestSortedIndexStorageTest.java  |   4 +-
 .../internal/storage/index/impl/TestIndexRow.java  |   3 +-
 .../storage/AbstractMvTableStorageTest.java        |  12 +-
 .../index/AbstractHashIndexStorageTest.java        |   7 +-
 .../index/impl/BinaryTupleRowSerializer.java       |  54 ++-
 .../internal/storage/index/impl/IndexRowImpl.java  |  46 ---
 .../storage/index/impl/TestSortedIndexStorage.java |  59 ++-
 .../storage/rocksdb/ColumnFamilyUtils.java         |  34 +-
 .../rocksdb/{HashIndexes.java => HashIndex.java}   |  18 +-
 .../storage/rocksdb/RocksDbMetaStorage.java        |   2 +-
 .../storage/rocksdb/RocksDbTableStorage.java       |  92 ++++-
 .../internal/storage/rocksdb/SortedIndex.java      |  69 ++++
 .../storage/rocksdb/index/CursorUtils.java         | 259 ++++++++++++
 .../index/RocksDbBinaryTupleComparator.java        | 102 +++++
 .../rocksdb/index/RocksDbHashIndexStorage.java     |   6 +-
 .../rocksdb/index/RocksDbSortedIndexStorage.java   | 240 +++++++++++
 .../storage/rocksdb/RocksDbMvTableStorageTest.java |  19 -
 .../storage/rocksdb/index/CursorUtilsTest.java     |  90 +++++
 .../rocksdb/index/RocksDbHashIndexStorageTest.java |  25 +-
 ...est.java => RocksDbSortedIndexStorageTest.java} |  35 +-
 48 files changed, 1981 insertions(+), 347 deletions(-)

diff --git a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
index 7f09077556..12c7527f68 100644
--- a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
+++ b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
@@ -33,7 +33,7 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.BitSet;
 import java.util.UUID;
-import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Utility to construct a binary tuple.
@@ -67,13 +67,23 @@ public class BinaryTupleBuilder {
     private boolean hasNullValues = false;
 
     /**
-     * Constructor.
+     * Creates a builder.
+     *
+     * @param numElements Number of tuple elements.
+     * @param allowNulls True if NULL values are possible, false otherwise.
+     */
+    public BinaryTupleBuilder(int numElements, boolean allowNulls) {
+        this(numElements, allowNulls, -1);
+    }
+
+    /**
+     * Creates a builder.
      *
      * @param numElements Number of tuple elements.
      * @param allowNulls True if NULL values are possible, false otherwise.
      * @param totalValueSize Total estimated length of non-NULL values, -1 if not known.
      */
-    private BinaryTupleBuilder(int numElements, boolean allowNulls, int totalValueSize) {
+    public BinaryTupleBuilder(int numElements, boolean allowNulls, int totalValueSize) {
         this.numElements = numElements;
 
         int base = BinaryTupleCommon.HEADER_SIZE;
@@ -94,29 +104,6 @@ public class BinaryTupleBuilder {
         allocate(totalValueSize);
     }
 
-    /**
-     * Creates a builder.
-     *
-     * @param numElements Number of tuple elements.
-     * @param allowNulls True if NULL values are possible, false otherwise.
-     * @return Tuple builder.
-     */
-    public static BinaryTupleBuilder create(int numElements, boolean allowNulls) {
-        return create(numElements, allowNulls, -1);
-    }
-
-    /**
-     * Creates a builder.
-     *
-     * @param numElements Number of tuple elements.
-     * @param allowNulls True if NULL values are possible, false otherwise.
-     * @param totalValueSize Total estimated length of non-NULL values, -1 if not known.
-     * @return Tuple builder.
-     */
-    public static BinaryTupleBuilder create(int numElements, boolean allowNulls, int totalValueSize) {
-        return new BinaryTupleBuilder(numElements, allowNulls, totalValueSize);
-    }
-
     /**
      * Check if the binary tuple contains a null map.
      */
@@ -223,7 +210,7 @@ public class BinaryTupleBuilder {
      * @param value Element value.
      * @return {@code this} for chaining.
      */
-    public BinaryTupleBuilder appendInt(Integer value) {
+    public BinaryTupleBuilder appendInt(@Nullable Integer value) {
         return value == null ? appendNull() : appendInt(value.intValue());
     }
 
@@ -308,7 +295,7 @@ public class BinaryTupleBuilder {
      * @param value Element value.
      * @return {@code this} for chaining.
      */
-    public BinaryTupleBuilder appendNumberNotNull(@NotNull BigInteger value) {
+    public BinaryTupleBuilder appendNumberNotNull(BigInteger value) {
         putBytes(value.toByteArray());
         return proceed();
     }
@@ -330,7 +317,7 @@ public class BinaryTupleBuilder {
      * @param scale Decimal scale.
      * @return {@code this} for chaining.
      */
-    public BinaryTupleBuilder appendDecimalNotNull(@NotNull BigDecimal value, int scale) {
+    public BinaryTupleBuilder appendDecimalNotNull(BigDecimal value, int scale) {
         putBytes(value.setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray());
         return proceed();
     }
@@ -352,7 +339,7 @@ public class BinaryTupleBuilder {
      * @param value Element value.
      * @return {@code this} for chaining.
      */
-    public BinaryTupleBuilder appendStringNotNull(@NotNull String value) {
+    public BinaryTupleBuilder appendStringNotNull(String value) {
         try {
             putString(value);
         } catch (CharacterCodingException e) {
@@ -367,7 +354,7 @@ public class BinaryTupleBuilder {
      * @param value Element value.
      * @return {@code this} for chaining.
      */
-    public BinaryTupleBuilder appendString(String value) {
+    public BinaryTupleBuilder appendString(@Nullable String value) {
         return value == null ? appendNull() : appendStringNotNull(value);
     }
 
@@ -377,7 +364,7 @@ public class BinaryTupleBuilder {
      * @param value Element value.
      * @return {@code this} for chaining.
      */
-    public BinaryTupleBuilder appendBytesNotNull(@NotNull byte[] value) {
+    public BinaryTupleBuilder appendBytesNotNull(byte[] value) {
         putBytes(value);
         return proceed();
     }
@@ -398,7 +385,7 @@ public class BinaryTupleBuilder {
      * @param value Element value.
      * @return {@code this} for chaining.
      */
-    public BinaryTupleBuilder appendUuidNotNull(@NotNull UUID value) {
+    public BinaryTupleBuilder appendUuidNotNull(UUID value) {
         long lsb = value.getLeastSignificantBits();
         long msb = value.getMostSignificantBits();
         if ((lsb | msb) != 0L) {
@@ -424,7 +411,7 @@ public class BinaryTupleBuilder {
      * @param value Element value.
      * @return {@code this} for chaining.
      */
-    public BinaryTupleBuilder appendBitmaskNotNull(@NotNull BitSet value) {
+    public BinaryTupleBuilder appendBitmaskNotNull(BitSet value) {
         putBytes(value.toByteArray());
         return proceed();
     }
@@ -445,7 +432,7 @@ public class BinaryTupleBuilder {
      * @param value Element value.
      * @return {@code this} for chaining.
      */
-    public BinaryTupleBuilder appendDateNotNull(@NotNull LocalDate value) {
+    public BinaryTupleBuilder appendDateNotNull(LocalDate value) {
         if (value != BinaryTupleCommon.DEFAULT_DATE) {
             putDate(value);
         }
@@ -468,7 +455,7 @@ public class BinaryTupleBuilder {
      * @param value Element value.
      * @return {@code this} for chaining.
      */
-    public BinaryTupleBuilder appendTimeNotNull(@NotNull LocalTime value) {
+    public BinaryTupleBuilder appendTimeNotNull(LocalTime value) {
         if (value != BinaryTupleCommon.DEFAULT_TIME) {
             putTime(value);
         }
@@ -491,7 +478,7 @@ public class BinaryTupleBuilder {
      * @param value Element value.
      * @return {@code this} for chaining.
      */
-    public BinaryTupleBuilder appendDateTimeNotNull(@NotNull LocalDateTime value) {
+    public BinaryTupleBuilder appendDateTimeNotNull(LocalDateTime value) {
         if (value != BinaryTupleCommon.DEFAULT_DATE_TIME) {
             putDate(value.toLocalDate());
             putTime(value.toLocalTime());
@@ -515,7 +502,7 @@ public class BinaryTupleBuilder {
      * @param value Element value.
      * @return {@code this} for chaining.
      */
-    public BinaryTupleBuilder appendTimestampNotNull(@NotNull Instant value) {
+    public BinaryTupleBuilder appendTimestampNotNull(Instant value) {
         if (value != BinaryTupleCommon.DEFAULT_TIMESTAMP) {
             long seconds = value.getEpochSecond();
             int nanos = value.getNano();
@@ -543,7 +530,7 @@ public class BinaryTupleBuilder {
      * @param bytes Buffer with element raw bytes.
      * @return {@code this} for chaining.
      */
-    public BinaryTupleBuilder appendElementBytes(@NotNull ByteBuffer bytes) {
+    public BinaryTupleBuilder appendElementBytes(ByteBuffer bytes) {
         putElement(bytes);
         return proceed();
     }
@@ -556,7 +543,7 @@ public class BinaryTupleBuilder {
      * @param length Length of the element in the buffer.
      * @return {@code this} for chaining.
      */
-    public BinaryTupleBuilder appendElementBytes(@NotNull ByteBuffer bytes, int offset, int length) {
+    public BinaryTupleBuilder appendElementBytes(ByteBuffer bytes, int offset, int length) {
         putElement(bytes, offset, length);
         return proceed();
     }
@@ -570,6 +557,15 @@ public class BinaryTupleBuilder {
         return elementIndex;
     }
 
+    /**
+     * Gets the expected number of tuple elements.
+     *
+     * @return Expected number of tuple elements.
+     */
+    public int numElements() {
+        return numElements;
+    }
+
     /**
      * Finalize tuple building.
      *
diff --git a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java
index 5d5e3eb409..43e719db6d 100644
--- a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java
+++ b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleCommon.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.binarytuple;
 
+import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -37,6 +38,13 @@ public class BinaryTupleCommon {
     /** Flag that indicates null map presence. */
     public static final int NULLMAP_FLAG = 0b100;
 
+    /**
+     * Flag that indicates that a Binary Tuple is instead a Binary Tuple Prefix.
+     *
+     * @see BinaryTuplePrefixBuilder
+     */
+    public static final int PREFIX_FLAG = 0b1000;
+
     /** Default value for UUID elements. */
     public static final UUID DEFAULT_UUID = new UUID(0, 0);
 
@@ -110,4 +118,17 @@ public class BinaryTupleCommon {
     public static byte nullMask(int index) {
         return (byte) (1 << (index % 8));
     }
+
+    /**
+     * Returns {@code true} if the given {@code buffer} represents a Binary Tuple Prefix.
+     *
+     * @param buffer Buffer containing a serialized Binary Tuple or Binary Tuple Prefix.
+     * @return {@code true} if the given {@code buffer} represents a Binary Tuple Prefix or {@code false} otherwise.
+     * @see BinaryTuplePrefixBuilder
+     */
+    public static boolean isPrefix(ByteBuffer buffer) {
+        byte flags = buffer.get(0);
+
+        return (flags & PREFIX_FLAG) != 0;
+    }
 }
diff --git a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
index f96d4451ec..47e8e4ca87 100644
--- a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
+++ b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
@@ -49,6 +49,9 @@ public class BinaryTupleParser {
     /** UUID size in bytes. */
     private static final int UUID_SIZE = 16;
 
+    /** Byte order of ByteBuffers that contain the tuple. */
+    private static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
+
     /** Number of elements in the tuple. */
     private final int numElements;
 
@@ -73,13 +76,14 @@ public class BinaryTupleParser {
     public BinaryTupleParser(int numElements, ByteBuffer buffer) {
         this.numElements = numElements;
 
-        assert buffer.order() == ByteOrder.LITTLE_ENDIAN;
+        assert buffer.order() == ORDER;
         assert buffer.position() == 0;
         this.buffer = buffer;
 
         byte flags = buffer.get(0);
 
         int base = BinaryTupleCommon.HEADER_SIZE;
+
         if ((flags & BinaryTupleCommon.NULLMAP_FLAG) != 0) {
             base += BinaryTupleCommon.nullMapSize(numElements);
         }
@@ -114,7 +118,7 @@ public class BinaryTupleParser {
      * Returns the content of this tuple as a byte buffer.
      */
     public ByteBuffer byteBuffer() {
-        return buffer.slice();
+        return buffer.slice().order(ORDER);
     }
 
     /**
diff --git a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTuplePrefixBuilder.java b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTuplePrefixBuilder.java
new file mode 100644
index 0000000000..173765b39f
--- /dev/null
+++ b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTuplePrefixBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binarytuple;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Class for build Binary Tuple Prefixes.
+ *
+ * <p>A Binary Tuple Prefix is an extension of the Binary Tuple format, that is, it adds an additional field, containing the number of
+ * elements in the prefix, at the end of the serialized tuple representation. This is helpful in cases when it is required to de-serialize
+ * such prefix while only having the full tuple schema.
+ *
+ * <p>The builder also sets the {@link BinaryTupleCommon#PREFIX_FLAG} in the flags region in order to able to distinguish such prefixes
+ * from regular tuples.
+ */
+public class BinaryTuplePrefixBuilder extends BinaryTupleBuilder {
+    private final int prefixNumElements;
+
+    /**
+     * Creates a new builder.
+     *
+     * @param prefixNumElements Number of elements in the prefix.
+     * @param fullNumElements Number of elements in the Binary Tuple Schema.
+     */
+    public BinaryTuplePrefixBuilder(int prefixNumElements, int fullNumElements) {
+        super(fullNumElements + 1, true, -1);
+
+        this.prefixNumElements = prefixNumElements;
+    }
+
+    @Override
+    public ByteBuffer build() {
+        int elementIndex = elementIndex();
+
+        if (elementIndex != prefixNumElements) {
+            throw new IllegalStateException(String.format(
+                    "Unexpected amount of elements in a BinaryTuple prefix. Expected: %d, actual %d",
+                    prefixNumElements, elementIndex
+            ));
+        }
+
+        int numElements = numElements();
+
+        // Use nulls instead of the missing elements.
+        while (elementIndex() < numElements - 1) {
+            appendNull();
+        }
+
+        appendInt(prefixNumElements);
+
+        ByteBuffer tuple = super.build();
+
+        // Set the flag indicating that this tuple is a prefix.
+        byte flags = tuple.get(0);
+
+        flags |= BinaryTupleCommon.PREFIX_FLAG;
+
+        tuple.put(0, flags);
+
+        return tuple;
+    }
+}
diff --git a/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/BinaryTupleTest.java b/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/BinaryTupleTest.java
index 969ba0acf6..43e66977ed 100644
--- a/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/BinaryTupleTest.java
+++ b/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/BinaryTupleTest.java
@@ -136,7 +136,7 @@ public class BinaryTupleTest {
     public void byteTest() {
         byte[] values = {Byte.MIN_VALUE, -1, 0, 1, Byte.MAX_VALUE};
         for (byte value : values) {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 1);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 1);
             ByteBuffer bytes = builder.appendByte(value).build();
             assertEquals(value != 0 ? 1 : 0, bytes.get(1));
             assertEquals(value != 0 ? 3 : 2, bytes.limit());
@@ -153,7 +153,7 @@ public class BinaryTupleTest {
     public void shortTest() {
         short[] values = {Byte.MIN_VALUE, -1, 0, 1, Byte.MAX_VALUE};
         for (short value : values) {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 1);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 1);
             ByteBuffer bytes = builder.appendShort(value).build();
             assertEquals(value != 0 ? 1 : 0, bytes.get(1));
             assertEquals(value != 0 ? 3 : 2, bytes.limit());
@@ -164,7 +164,7 @@ public class BinaryTupleTest {
 
         values = new short[]{Short.MIN_VALUE, Byte.MIN_VALUE - 1, Byte.MAX_VALUE + 1, Short.MAX_VALUE};
         for (short value : values) {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 2);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 2);
             ByteBuffer bytes = builder.appendShort(value).build();
             assertEquals(2, bytes.get(1));
             assertEquals(4, bytes.limit());
@@ -181,7 +181,7 @@ public class BinaryTupleTest {
     public void intTest() {
         int[] values = {Byte.MIN_VALUE, -1, 0, 1, Byte.MAX_VALUE};
         for (int value : values) {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 1);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 1);
             ByteBuffer bytes = builder.appendInt(value).build();
             assertEquals(value != 0 ? 1 : 0, bytes.get(1));
             assertEquals(value != 0 ? 3 : 2, bytes.limit());
@@ -192,7 +192,7 @@ public class BinaryTupleTest {
 
         values = new int[]{Short.MIN_VALUE, Byte.MIN_VALUE - 1, Byte.MAX_VALUE + 1, Short.MAX_VALUE};
         for (int value : values) {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 2);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 2);
             ByteBuffer bytes = builder.appendInt(value).build();
             assertEquals(2, bytes.get(1));
             assertEquals(4, bytes.limit());
@@ -203,7 +203,7 @@ public class BinaryTupleTest {
 
         values = new int[]{Integer.MIN_VALUE, Short.MIN_VALUE - 1, Short.MAX_VALUE + 1, Integer.MAX_VALUE};
         for (int value : values) {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 4);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 4);
             ByteBuffer bytes = builder.appendInt(value).build();
             assertEquals(4, bytes.get(1));
             assertEquals(6, bytes.limit());
@@ -220,7 +220,7 @@ public class BinaryTupleTest {
     public void longTest() {
         long[] values = {Byte.MIN_VALUE, -1, 0, 1, Byte.MAX_VALUE};
         for (long value : values) {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 1);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 1);
             ByteBuffer bytes = builder.appendLong(value).build();
             assertEquals(value != 0 ? 1 : 0, bytes.get(1));
             assertEquals(value != 0 ? 3 : 2, bytes.limit());
@@ -231,7 +231,7 @@ public class BinaryTupleTest {
 
         values = new long[]{Short.MIN_VALUE, Byte.MIN_VALUE - 1, Byte.MAX_VALUE + 1, Short.MAX_VALUE};
         for (long value : values) {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 2);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 2);
             ByteBuffer bytes = builder.appendLong(value).build();
             assertEquals(2, bytes.get(1));
             assertEquals(4, bytes.limit());
@@ -242,7 +242,7 @@ public class BinaryTupleTest {
 
         values = new long[]{Integer.MIN_VALUE, Short.MIN_VALUE - 1, Short.MAX_VALUE + 1, Integer.MAX_VALUE};
         for (long value : values) {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 4);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 4);
             ByteBuffer bytes = builder.appendLong(value).build();
             assertEquals(4, bytes.get(1));
             assertEquals(6, bytes.limit());
@@ -253,7 +253,7 @@ public class BinaryTupleTest {
 
         values = new long[]{Long.MIN_VALUE, Integer.MIN_VALUE - 1L, Integer.MAX_VALUE + 1L, Long.MAX_VALUE};
         for (long value : values) {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 8);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 8);
             ByteBuffer bytes = builder.appendLong(value).build();
             assertEquals(8, bytes.get(1));
             assertEquals(10, bytes.limit());
@@ -271,7 +271,7 @@ public class BinaryTupleTest {
         {
             float value = 0.0F;
 
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 0);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 0);
             ByteBuffer bytes = builder.appendFloat(value).build();
             assertEquals(0, bytes.get(1));
             assertEquals(2, bytes.limit());
@@ -282,7 +282,7 @@ public class BinaryTupleTest {
         {
             float value = 0.5F;
 
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 4);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 4);
             ByteBuffer bytes = builder.appendFloat(value).build();
             assertEquals(4, bytes.get(1));
             assertEquals(6, bytes.limit());
@@ -300,7 +300,7 @@ public class BinaryTupleTest {
         {
             double value = 0.0;
 
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 0);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 0);
             ByteBuffer bytes = builder.appendDouble(value).build();
             assertEquals(0, bytes.get(1));
             assertEquals(2, bytes.limit());
@@ -311,7 +311,7 @@ public class BinaryTupleTest {
         {
             double value = 0.5;
 
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 4);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 4);
             ByteBuffer bytes = builder.appendDouble(value).build();
             assertEquals(4, bytes.get(1));
             assertEquals(6, bytes.limit());
@@ -322,7 +322,7 @@ public class BinaryTupleTest {
         {
             double value = 0.1;
 
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false, 8);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false, 8);
             ByteBuffer bytes = builder.appendDouble(value).build();
             assertEquals(8, bytes.get(1));
             assertEquals(10, bytes.limit());
@@ -339,7 +339,7 @@ public class BinaryTupleTest {
     public void numberTest() {
         BigInteger value = BigInteger.valueOf(12345);
 
-        BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+        BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
         ByteBuffer bytes = builder.appendNumber(value).build();
 
         BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -353,7 +353,7 @@ public class BinaryTupleTest {
     public void decimalTest() {
         BigDecimal value = new BigDecimal(BigInteger.valueOf(12345), 100);
 
-        BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+        BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
         ByteBuffer bytes = builder.appendDecimal(value, 100).build();
 
         BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -369,7 +369,7 @@ public class BinaryTupleTest {
         int valueScale = 3;
         BigDecimal value = BigDecimal.valueOf(123456, valueScale);
 
-        BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+        BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
         ByteBuffer bytes = builder.appendDecimal(value, schemaScale).build();
 
         BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -385,7 +385,7 @@ public class BinaryTupleTest {
     public void stringTest() {
         String[] values = {"ascii", "我愛Java", "", "a string with a bit more characters"};
 
-        BinaryTupleBuilder builder = BinaryTupleBuilder.create(values.length, false);
+        BinaryTupleBuilder builder = new BinaryTupleBuilder(values.length, false);
         for (String value : values) {
             builder.appendString(value);
         }
@@ -411,7 +411,7 @@ public class BinaryTupleTest {
                 values[i] = generateBytes(rnd);
             }
 
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(values.length, true);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(values.length, true);
             for (byte[] value : values) {
                 builder.appendBytes(value);
             }
@@ -439,7 +439,7 @@ public class BinaryTupleTest {
     public void uuidTest() {
         UUID value = UUID.randomUUID();
 
-        BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+        BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
         ByteBuffer bytes = builder.appendUuid(value).build();
 
         BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -458,7 +458,7 @@ public class BinaryTupleTest {
             if (valueBytes != null) {
                 BitSet value = BitSet.valueOf(valueBytes);
 
-                BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+                BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
                 ByteBuffer bytes = builder.appendBitmask(value).build();
 
                 BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -474,7 +474,7 @@ public class BinaryTupleTest {
     public void dateTest() {
         LocalDate value = LocalDate.now();
 
-        BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+        BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
         ByteBuffer bytes = builder.appendDate(value).build();
         assertEquals(3, bytes.get(1));
         assertEquals(5, bytes.limit());
@@ -491,7 +491,7 @@ public class BinaryTupleTest {
         LocalTime value = LocalTime.now();
 
         {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
             ByteBuffer bytes = builder.appendTime(value).build();
 
             BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -501,7 +501,7 @@ public class BinaryTupleTest {
         value = LocalTime.of(value.getHour(), value.getMinute(), value.getSecond(), 1_000_000);
 
         {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
             ByteBuffer bytes = builder.appendTime(value).build();
             assertEquals(4, bytes.get(1));
             assertEquals(6, bytes.limit());
@@ -513,7 +513,7 @@ public class BinaryTupleTest {
         value = LocalTime.of(value.getHour(), value.getMinute(), value.getSecond(), 1_001_000);
 
         {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
             ByteBuffer bytes = builder.appendTime(value).build();
             assertEquals(5, bytes.get(1));
             assertEquals(7, bytes.limit());
@@ -525,7 +525,7 @@ public class BinaryTupleTest {
         value = LocalTime.of(value.getHour(), value.getMinute(), value.getSecond(), 1_001_001);
 
         {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
             ByteBuffer bytes = builder.appendTime(value).build();
             assertEquals(6, bytes.get(1));
             assertEquals(8, bytes.limit());
@@ -543,7 +543,7 @@ public class BinaryTupleTest {
         LocalDateTime value = LocalDateTime.now();
 
         {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
             ByteBuffer bytes = builder.appendDateTime(value).build();
 
             BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -554,7 +554,7 @@ public class BinaryTupleTest {
                 LocalTime.of(value.getHour(), value.getMinute(), value.getSecond(), 1_000_000));
 
         {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
             ByteBuffer bytes = builder.appendDateTime(value).build();
             assertEquals(7, bytes.get(1));
             assertEquals(9, bytes.limit());
@@ -567,7 +567,7 @@ public class BinaryTupleTest {
                 LocalTime.of(value.getHour(), value.getMinute(), value.getSecond(), 1_001_000));
 
         {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
             ByteBuffer bytes = builder.appendDateTime(value).build();
             assertEquals(8, bytes.get(1));
             assertEquals(10, bytes.limit());
@@ -580,7 +580,7 @@ public class BinaryTupleTest {
                 LocalTime.of(value.getHour(), value.getMinute(), value.getSecond(), 1_001_001));
 
         {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
             ByteBuffer bytes = builder.appendDateTime(value).build();
             assertEquals(9, bytes.get(1));
             assertEquals(11, bytes.limit());
@@ -599,7 +599,7 @@ public class BinaryTupleTest {
         Instant value = Instant.now();
 
         {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
             ByteBuffer bytes = builder.appendTimestamp(value).build();
 
             BinaryTupleReader reader = new BinaryTupleReader(1, bytes);
@@ -609,7 +609,7 @@ public class BinaryTupleTest {
         value = Instant.ofEpochSecond(value.getEpochSecond());
 
         {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
             ByteBuffer bytes = builder.appendTimestamp(value).build();
             assertEquals(8, bytes.get(1));
             assertEquals(10, bytes.limit());
@@ -621,7 +621,7 @@ public class BinaryTupleTest {
         value = Instant.ofEpochSecond(value.getEpochSecond(), 1);
 
         {
-            BinaryTupleBuilder builder = BinaryTupleBuilder.create(1, false);
+            BinaryTupleBuilder builder = new BinaryTupleBuilder(1, false);
             ByteBuffer bytes = builder.appendTimestamp(value).build();
             assertEquals(12, bytes.get(1));
             assertEquals(14, bytes.limit());
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 1ab66e4521..dd45979d68 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -133,7 +133,7 @@ public class ClientTableCommon {
             packer.packInt(schema.version());
         }
 
-        var builder = BinaryTupleBuilder.create(columnCount(schema, part), true);
+        var builder = new BinaryTupleBuilder(columnCount(schema, part), true);
 
         if (part != TuplePart.VAL) {
             for (var col : schema.keyColumns().columns()) {
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
index 03daff5b6f..224ad509c3 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
@@ -458,7 +458,7 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
     }
 
     private void writeKeyValueRaw(ClientSchema s, PayloadOutputChannel w, @NotNull K key, V val) {
-        var builder = BinaryTupleBuilder.create(s.columns().length, true);
+        var builder = new BinaryTupleBuilder(s.columns().length, true);
         var noValueSet = new BitSet();
         ClientMarshallerWriter writer = new ClientMarshallerWriter(builder, noValueSet);
 
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordSerializer.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordSerializer.java
index faadcdef21..2b133dab70 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordSerializer.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordSerializer.java
@@ -105,7 +105,7 @@ public class ClientRecordSerializer<R> {
      */
     static <R> void writeRecRaw(@Nullable R rec, ClientMessagePacker out, Marshaller marshaller, int columnCount) {
         try {
-            var builder = BinaryTupleBuilder.create(columnCount, true);
+            var builder = new BinaryTupleBuilder(columnCount, true);
             var noValueSet = new BitSet();
 
             var writer = new ClientMarshallerWriter(builder, noValueSet);
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
index 79b30172d9..0bf002c89f 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
@@ -134,7 +134,7 @@ public class ClientTupleSerializer {
         var columns = schema.columns();
         var count = keyOnly ? schema.keyColumnCount() : columns.length;
 
-        var builder = BinaryTupleBuilder.create(count, true);
+        var builder = new BinaryTupleBuilder(count, true);
         var noValueSet = new BitSet(count);
 
         for (var i = 0; i < count; i++) {
@@ -172,7 +172,7 @@ public class ClientTupleSerializer {
 
         var columns = schema.columns();
         var noValueSet = new BitSet(columns.length);
-        var builder = BinaryTupleBuilder.create(columns.length, true);
+        var builder = new BinaryTupleBuilder(columns.length, true);
 
         for (var i = 0; i < columns.length; i++) {
             var col = columns[i];
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java
index dd2bd9e1af..f9b281bc2f 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java
@@ -135,7 +135,7 @@ public class RocksDbClusterStateStorage implements ClusterStateStorage {
 
     @Override
     public <T> Cursor<T> getWithPrefix(byte[] prefix, BiFunction<byte[], byte[], T> entryTransformer) {
-        byte[] upperBound = RocksUtils.rangeEnd(prefix);
+        byte[] upperBound = RocksUtils.incrementArray(prefix);
 
         Slice upperBoundSlice = upperBound == null ? null : new Slice(upperBound);
 
diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
index d9c84f9064..768618a560 100644
--- a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
+++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
@@ -186,7 +186,7 @@ public abstract class AbstractClusterStateStorageTest {
     @Test
     void testGetWithPrefixBorder() throws Exception {
         byte[] key1 = "key1".getBytes(UTF_8);
-        byte[] key2 = RocksUtils.rangeEnd(key1);
+        byte[] key2 = RocksUtils.incrementArray(key1);
 
         storage.put(key1, "value1".getBytes(UTF_8));
         storage.put(key2, "value2".getBytes(UTF_8));
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
index 4517a52ae3..ff8ee2b987 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
@@ -263,7 +263,7 @@ namespace Apache.Ignite.Tests.Sql
             var ex = Assert.ThrowsAsync<TableNotFoundException>(
                 async () => await Client.Sql.ExecuteAsync(null, "ALTER TABLE NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR"));
 
-            StringAssert.EndsWith("Table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]", ex!.Message);
+            StringAssert.EndsWith("The table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]", ex!.Message);
             StringAssert.StartsWith("IGN-TBL-2", ex.Message);
             StringAssert.StartsWith("IGN-TBL-2", ex.CodeAsString);
             StringAssert.StartsWith("TBL", ex.GroupName);
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
index 5c27d8ff57..14b8877f26 100644
--- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
@@ -83,28 +83,28 @@ public class RocksUtils {
      * <p>This method tries to increment the least significant byte (in BE order) that is not equal to 0xFF (bytes are treated as
      * unsigned values).
      *
-     * @param rangeStart Start of a range of keys (prefix) in RocksDB.
+     * @param array Start of a range of keys (prefix) in RocksDB.
      * @return End of a range of keys in RocksDB or {@code null} if all bytes of the prefix are equal to 0xFF.
      */
-    public static byte @Nullable [] rangeEnd(byte[] rangeStart) {
-        byte[] rangeEnd = rangeStart.clone();
+    public static byte @Nullable [] incrementArray(byte[] array) {
+        byte[] result = array.clone();
 
-        int i = rangeStart.length - 1;
+        int i = array.length - 1;
 
         // Cycle through all bytes that are equal to 0xFF
-        while (i >= 0 && rangeStart[i] == -1) {
-            rangeEnd[i] = 0;
+        while (i >= 0 && array[i] == -1) {
+            result[i] = 0;
 
             i--;
         }
 
         if (i == -1) {
-            // All bytes are equal to 0xFF, no upper bound should be used
+            // All bytes are equal to 0xFF, increment is not possible
             return null;
         } else {
-            rangeEnd[i] += 1;
+            result[i] += 1;
 
-            return rangeEnd;
+            return result;
         }
     }
 }
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
index f72a045473..d709581827 100644
--- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
@@ -21,7 +21,6 @@ import static org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_BE
 import static org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_COMPLETED;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 import org.rocksdb.AbstractEventListener;
 import org.rocksdb.FlushJobInfo;
@@ -51,7 +50,7 @@ class RocksDbFlushListener extends AbstractEventListener {
      * @param flusher Flusher instance to delegate events processing to.
      */
     RocksDbFlushListener(RocksDbFlusher flusher) {
-        super(EnabledEventCallback.ON_FLUSH_BEGIN, EnabledEventCallback.ON_FLUSH_COMPLETED);
+        super(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED);
 
         this.flusher = flusher;
     }
@@ -67,13 +66,11 @@ class RocksDbFlushListener extends AbstractEventListener {
     /** {@inheritDoc} */
     @Override
     public void onFlushCompleted(RocksDB db, FlushJobInfo flushJobInfo) {
-        ExecutorService threadPool = flusher.threadPool;
-
         if (lastEventType.compareAndSet(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED)) {
-            lastFlushProcessed = CompletableFuture.runAsync(flusher.onFlushCompleted, threadPool);
+            lastFlushProcessed = flusher.onFlushCompleted();
         }
 
         // Do it for every column family, there's no way to tell in advance which one has the latest sequence number.
-        lastFlushProcessed.whenCompleteAsync((o, throwable) -> flusher.completeFutures(flushJobInfo.getLargestSeqno()), threadPool);
+        lastFlushProcessed.whenCompleteAsync((o, throwable) -> flusher.completeFutures(flushJobInfo.getLargestSeqno()), flusher.threadPool);
     }
 }
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
index 9c907d079f..63ea3ea4c3 100644
--- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.SortedMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -47,7 +48,7 @@ public class RocksDbFlusher {
     private volatile RocksDB db;
 
     /** List of all column families. */
-    private volatile List<ColumnFamilyHandle> columnFamilyHandles;
+    private final List<ColumnFamilyHandle> columnFamilyHandles = new CopyOnWriteArrayList<>();
 
     /** Scheduled pool to schedule flushes. */
     private final ScheduledExecutorService scheduledPool;
@@ -59,7 +60,7 @@ public class RocksDbFlusher {
     private final IntSupplier delaySupplier;
 
     /** Flush completion callback. */
-    final Runnable onFlushCompleted;
+    private final Runnable onFlushCompleted;
 
     /**
      * Flush options to be used to asynchronously flush the Rocks DB memtable. It needs to be cached, because
@@ -68,7 +69,7 @@ public class RocksDbFlusher {
     private final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(false);
 
     /** Map with flush futures by sequence number at the time of the {@link #awaitFlush(boolean)} call. */
-    final SortedMap<Long, CompletableFuture<Void>> flushFuturesBySequenceNumber = new ConcurrentSkipListMap<>();
+    private final SortedMap<Long, CompletableFuture<Void>> flushFuturesBySequenceNumber = new ConcurrentSkipListMap<>();
 
     /** Latest known sequence number for persisted data. Not volatile, protected by explicit synchronization. */
     private long latestPersistedSequenceNumber;
@@ -132,13 +133,20 @@ public class RocksDbFlusher {
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
     public void init(RocksDB db, List<ColumnFamilyHandle> columnFamilyHandles) {
         this.db = db;
-        this.columnFamilyHandles = columnFamilyHandles;
+        this.columnFamilyHandles.addAll(columnFamilyHandles);
 
         synchronized (latestPersistedSequenceNumberMux) {
             latestPersistedSequenceNumber = db.getLatestSequenceNumber();
         }
     }
 
+    /**
+     * Adds the given handle to the list of CF handles.
+     */
+    public void addColumnFamily(ColumnFamilyHandle handle) {
+        columnFamilyHandles.add(handle);
+    }
+
     /**
      * Returns a future to wait next flush operation from the current point in time. Uses {@link RocksDB#getLatestSequenceNumber()} to
      * achieve this, by fixing its value at the time of invokation. Storage is considered flushed when at least one persisted column
@@ -235,4 +243,13 @@ public class RocksDbFlusher {
 
         flushOptions.close();
     }
+
+    /**
+     * Executes the {@code onFlushCompleted} callback.
+     *
+     * @return Future that completes when the {@code onFlushCompleted} callback finishes.
+     */
+    CompletableFuture<Void> onFlushCompleted() {
+        return CompletableFuture.runAsync(onFlushCompleted, threadPool);
+    }
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryConverter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryConverter.java
index 259b82f2a3..7633d7aef7 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryConverter.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryConverter.java
@@ -118,8 +118,7 @@ public class BinaryConverter {
         }
 
         // Now compose the tuple.
-        BinaryTupleBuilder builder = BinaryTupleBuilder.create(
-                tupleSchema.elementCount(), hasNulls, estimatedValueSize);
+        BinaryTupleBuilder builder = new BinaryTupleBuilder(tupleSchema.elementCount(), hasNulls, estimatedValueSize);
 
         for (int elementIndex = 0; elementIndex < tupleSchema.elementCount(); elementIndex++) {
             BinaryTupleSchema.Element elt = tupleSchema.element(elementIndex);
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
index c49e83353d..af762964f9 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
@@ -94,4 +94,9 @@ public interface BinaryRow {
      * Get byte array of the row.
      */
     byte[] bytes();
+
+    /**
+     * Returns the representation of this row as a Byte Buffer.
+     */
+    ByteBuffer byteBuffer();
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
new file mode 100644
index 0000000000..32e23dda2f
--- /dev/null
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
+import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+
+/**
+ * Class that represents a Binary Tuple Prefix.
+ *
+ * @see BinaryTuplePrefixBuilder BinaryTuplePrefixBuilder for information about the Binary Tuple Prefix format.
+ */
+public class BinaryTuplePrefix extends BinaryTupleReader implements InternalTuple {
+    /** Tuple schema. */
+    private final BinaryTupleSchema schema;
+
+    /**
+     * Constructor.
+     *
+     * @param schema Full Tuple schema.
+     * @param bytes Serialized representation of a Binary Tuple Prefix.
+     */
+    public BinaryTuplePrefix(BinaryTupleSchema schema, byte[] bytes) {
+        super(schema.elementCount() + 1, bytes);
+        this.schema = schema;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param schema Full Tuple schema.
+     * @param buffer Serialized representation of a Binary Tuple Prefix.
+     */
+    public BinaryTuplePrefix(BinaryTupleSchema schema, ByteBuffer buffer) {
+        super(schema.elementCount() + 1, buffer);
+        this.schema = schema;
+    }
+
+    @Override
+    public int count() {
+        return elementCount();
+    }
+
+    @Override
+    public BigDecimal decimalValue(int index) {
+        return decimalValue(index, schema.element(index).decimalScale);
+    }
+
+    @Override
+    public int elementCount() {
+        return intValue(super.elementCount() - 1);
+    }
+}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index 614be90a74..a994c33551 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -124,4 +124,9 @@ public class ByteBufferRow implements BinaryRow {
 
         return tmp;
     }
+
+    @Override
+    public ByteBuffer byteBuffer() {
+        return buf.slice().order(ORDER);
+    }
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java
index 67c712d726..63c2196206 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.schema.row;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -220,4 +221,9 @@ public interface InternalTuple {
      * @return Column value.
      */
     Instant timestampValue(int col);
+
+    /**
+     * Returns the representation of this tuple as a Byte Buffer.
+     */
+    ByteBuffer byteBuffer();
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
index 4da400eaf9..b63deff5ff 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
@@ -809,6 +809,11 @@ public class Row implements BinaryRowEx, SchemaAware, InternalTuple {
         return row.bytes();
     }
 
+    @Override
+    public ByteBuffer byteBuffer() {
+        return row.byteBuffer();
+    }
+
     /** {@inheritDoc} */
     @Override
     public int colocationHash() {
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTuplePrefixTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTuplePrefixTest.java
new file mode 100644
index 0000000000..72574b7ced
--- /dev/null
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTuplePrefixTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
+import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for the {@link BinaryTuplePrefix} class.
+ */
+public class BinaryTuplePrefixTest {
+
+    /**
+     * Tests construction of a BinaryTuple prefix.
+     */
+    @Test
+    public void testPrefix() {
+        BinaryTupleSchema schema = BinaryTupleSchema.create(new Element[]{
+                new Element(NativeTypes.INT32, false),
+                new Element(NativeTypes.STRING, false),
+                new Element(NativeTypes.DATE, false),
+                new Element(NativeTypes.UUID, false),
+                new Element(NativeTypes.DOUBLE, false)
+        });
+
+        var builder = new BinaryTuplePrefixBuilder(3, 5);
+
+        LocalDate date = LocalDate.now();
+
+        ByteBuffer tuple = builder.appendInt(42)
+                .appendString("foobar")
+                .appendDate(date)
+                .build();
+
+        assertTrue(BinaryTupleCommon.isPrefix(tuple));
+
+        var prefix = new BinaryTuplePrefix(schema, tuple);
+
+        assertThat(prefix.count(), is(3));
+        assertThat(prefix.elementCount(), is(3));
+
+        assertThat(prefix.intValue(0), is(42));
+        assertThat(prefix.stringValue(1), is("foobar"));
+        assertThat(prefix.dateValue(2), is(date));
+        assertThat(prefix.uuidValue(3), is(nullValue()));
+        assertThat(prefix.doubleValueBoxed(4), is(nullValue()));
+    }
+
+    /**
+     * Tests construction of an invalid prefix.
+     */
+    @Test
+    public void testInvalidPrefix() {
+        Exception e = assertThrows(IllegalStateException.class, () -> {
+            var builder = new BinaryTuplePrefixBuilder(3, 5);
+
+            builder.appendInt(42).build();
+        });
+
+        assertThat(e.getMessage(), is("Unexpected amount of elements in a BinaryTuple prefix. Expected: 3, actual 1"));
+
+        e = assertThrows(IllegalStateException.class, () -> {
+            var builder = new BinaryTuplePrefixBuilder(3, 5);
+
+            builder.appendInt(42)
+                    .appendInt(42)
+                    .appendInt(42)
+                    .appendInt(42)
+                    .build();
+        });
+
+        assertThat(e.getMessage(), is("Unexpected amount of elements in a BinaryTuple prefix. Expected: 3, actual 4"));
+    }
+}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleComparator.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/BinaryTupleComparator.java
similarity index 60%
rename from modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleComparator.java
rename to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/BinaryTupleComparator.java
index e36dd9671f..4fdc6eb4ed 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleComparator.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/BinaryTupleComparator.java
@@ -15,80 +15,70 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.storage.index.impl;
+package org.apache.ignite.internal.storage.index;
 
+import static org.apache.ignite.internal.binarytuple.BinaryTupleCommon.isPrefix;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.Arrays;
 import java.util.Comparator;
 import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.NativeTypeSpec;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.apache.ignite.internal.schema.row.InternalTuple;
 import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
 
 /**
- * Comparator implementation for comparting {@link BinaryTuple}s on a per-column basis.
+ * Comparator implementation for comparing {@link BinaryTuple}s on a per-column basis.
  */
-class BinaryTupleComparator implements Comparator<BinaryTuple> {
+public class BinaryTupleComparator implements Comparator<ByteBuffer> {
     private final SortedIndexDescriptor descriptor;
 
-    private final int prefixLength;
-
-    private BinaryTupleComparator(SortedIndexDescriptor descriptor, int prefixLength) {
-        if (prefixLength > descriptor.indexColumns().size()) {
-            throw new IllegalArgumentException("Invalid prefix length: " + prefixLength);
-        }
-
-        this.descriptor = descriptor;
-        this.prefixLength = prefixLength;
-    }
-
     /**
      * Creates a comparator for a Sorted Index identified by the given descriptor.
      */
-    static BinaryTupleComparator newComparator(SortedIndexDescriptor descriptor) {
-        return new BinaryTupleComparator(descriptor, descriptor.indexColumns().size());
-    }
-
-    /**
-     * Similar to {@link #newComparator} but creates a comparator that only compares first {@code prefixLength} index columns.
-     */
-    static BinaryTupleComparator newPrefixComparator(SortedIndexDescriptor descriptor, int prefixLength) {
-        return new BinaryTupleComparator(descriptor, prefixLength);
+    public BinaryTupleComparator(SortedIndexDescriptor descriptor) {
+        this.descriptor = descriptor;
     }
 
     @Override
-    public int compare(BinaryTuple tuple1, BinaryTuple tuple2) {
-        return compare(tuple1, tuple2, 1, 0);
-    }
+    public int compare(ByteBuffer buffer1, ByteBuffer buffer2) {
+        assert buffer1.order() == ByteOrder.LITTLE_ENDIAN;
+        assert buffer2.order() == ByteOrder.LITTLE_ENDIAN;
 
-    /**
-     * Compares a given tuple with the configured prefix.
-     *
-     * @param tuple1 Tuple to compare.
-     * @param tuple2 Tuple to compare.
-     * @param direction Sort direction: {@code -1} means sorting in reversed order, {@code 1} means  sorting in the natural order.
-     * @param equals Value that should be returned if the provided tuple exactly matches the prefix.
-     * @return the value {@code 0} if the given row starts with the configured prefix;
-     *         a value less than {@code 0} if the row's prefix is smaller than the prefix; and
-     *         a value greater than {@code 0} if the row's prefix is larger than the prefix.
-     */
-    public int compare(BinaryTuple tuple1, BinaryTuple tuple2, int direction, int equals) {
-        for (int i = 0; i < prefixLength; i++) {
+        boolean isBuffer1Prefix = isPrefix(buffer1);
+        boolean isBuffer2Prefix = isPrefix(buffer2);
+
+        assert !(isBuffer1Prefix && isBuffer2Prefix);
+
+        BinaryTupleSchema schema = descriptor.binaryTupleSchema();
+
+        InternalTuple tuple1 = isBuffer1Prefix ? new BinaryTuplePrefix(schema, buffer1) : new BinaryTuple(schema, buffer1);
+        InternalTuple tuple2 = isBuffer2Prefix ? new BinaryTuplePrefix(schema, buffer2) : new BinaryTuple(schema, buffer2);
+
+        int columnsToCompare = Math.min(tuple1.count(), tuple2.count());
+
+        assert columnsToCompare <= descriptor.indexColumns().size();
+
+        for (int i = 0; i < columnsToCompare; i++) {
             ColumnDescriptor columnDescriptor = descriptor.indexColumns().get(i);
 
             int compare = compareField(tuple1, tuple2, i);
 
             if (compare != 0) {
-                return direction * (columnDescriptor.asc() ? compare : -compare);
+                return columnDescriptor.asc() ? compare : -compare;
             }
         }
 
-        return equals;
+        return 0;
     }
 
     /**
      * Compares individual fields of two tuples.
      */
-    private int compareField(BinaryTuple tuple1, BinaryTuple tuple2, int index) {
+    private int compareField(InternalTuple tuple1, InternalTuple tuple2, int index) {
         boolean tuple1HasNull = tuple1.hasNullValue(index);
         boolean tuple2HasNull = tuple2.hasNullValue(index);
 
@@ -142,7 +132,7 @@ class BinaryTupleComparator implements Comparator<BinaryTuple> {
 
             default:
                 throw new IllegalArgumentException(String.format(
-                        "Unsupported column schema for creating a sorted index. Column name: %s, column type: %s",
+                        "Unsupported column type in binary tuple comparator. Column name: %s, column type: %s",
                         columnDescriptor.name(), columnDescriptor.type()
                 ));
         }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java
index b547a30543..d72f24406e 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java
@@ -93,7 +93,6 @@ public class HashIndexDescriptor {
      * @param tableConfig Table configuration.
      * @param tablesConfig Tables and indexes configuration.
      * @param indexId Index id.
-     *
      */
     // TODO: IGNITE-17727 Fix redundant param.
     public HashIndexDescriptor(UUID indexId, TableView tableConfig, TablesView tablesConfig) {
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
index 59a1c220e9..6e2361118b 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
@@ -29,6 +29,8 @@ import org.apache.ignite.configuration.schemas.table.TableIndexView;
 import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.configuration.schemas.table.TablesView;
 import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
 import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
 import org.apache.ignite.internal.schema.configuration.SchemaDescriptorConverter;
@@ -53,7 +55,28 @@ public class SortedIndexDescriptor {
 
         private final boolean asc;
 
-        ColumnDescriptor(ColumnView tableColumnView, IndexColumnView indexColumnView) {
+        /**
+         * Creates a Column Descriptor.
+         *
+         * @param name Name of the column.
+         * @param type Type of the column.
+         * @param nullable Flag indicating that the column may contain {@code null}s.
+         * @param asc Sort order of the column.
+         */
+        public ColumnDescriptor(String name, NativeType type, boolean nullable, boolean asc) {
+            this.name = name;
+            this.type = type;
+            this.nullable = nullable;
+            this.asc = asc;
+        }
+
+        /**
+         * Creates a Column Descriptor.
+         *
+         * @param tableColumnView Table column configuration.
+         * @param indexColumnView Index column configuration.
+         */
+        public ColumnDescriptor(ColumnView tableColumnView, IndexColumnView indexColumnView) {
             this.name = tableColumnView.name();
             this.type = SchemaDescriptorConverter.convert(SchemaConfigurationConverter.convert(tableColumnView.type()));
             this.nullable = tableColumnView.nullable();
@@ -98,14 +121,36 @@ public class SortedIndexDescriptor {
 
     private final List<ColumnDescriptor> columns;
 
+    private final BinaryTupleSchema binaryTupleSchema;
+
     /**
      * Creates an Index Descriptor from a given Table Configuration.
      *
-     * @param indexId index ID.
-     * @param tableConfig table configuration.
+     * @param indexId Index ID.
+     * @param tableConfig Table configuration.
      */
     // TODO: IGNITE-17727 Fix redundant param.
     public SortedIndexDescriptor(UUID indexId, TableView tableConfig, TablesView tablesConfig) {
+        this(indexId, extractIndexColumnsConfiguration(indexId, tableConfig, tablesConfig));
+    }
+
+    /**
+     * Creates an Index Descriptor from a given set of column descriptors.
+     *
+     * @param indexId Index ID.
+     * @param columnDescriptors Column descriptors.
+     */
+    public SortedIndexDescriptor(UUID indexId, List<ColumnDescriptor> columnDescriptors) {
+        this.id = indexId;
+        this.columns = List.copyOf(columnDescriptors);
+        this.binaryTupleSchema = createSchema(columns);
+    }
+
+    private static List<ColumnDescriptor> extractIndexColumnsConfiguration(
+            UUID indexId,
+            TableView tableConfig,
+            TablesView tablesConfig
+    ) {
         TableIndexView indexConfig = ConfigurationUtil.getByInternalId(tablesConfig.indexes(), indexId);
 
         if (indexConfig == null) {
@@ -119,11 +164,9 @@ public class SortedIndexDescriptor {
             ));
         }
 
-        this.id = indexId;
-
         NamedListView<? extends IndexColumnView> indexColumns = ((SortedIndexView) indexConfig).columns();
 
-        columns = indexColumns.namedListKeys().stream()
+        return indexColumns.namedListKeys().stream()
                 .map(columnName -> {
                     ColumnView columnView = tableConfig.columns().get(columnName);
 
@@ -136,6 +179,14 @@ public class SortedIndexDescriptor {
                 .collect(toUnmodifiableList());
     }
 
+    private static BinaryTupleSchema createSchema(List<ColumnDescriptor> columns) {
+        Element[] elements = columns.stream()
+                .map(columnDescriptor -> new Element(columnDescriptor.type(), columnDescriptor.nullable()))
+                .toArray(Element[]::new);
+
+        return BinaryTupleSchema.create(elements);
+    }
+
     /**
      * Returns this index' ID.
      */
@@ -149,4 +200,11 @@ public class SortedIndexDescriptor {
     public List<ColumnDescriptor> indexColumns() {
         return columns;
     }
+
+    /**
+     * Returns a {@code BinaryTupleSchema} that corresponds to the index configuration.
+     */
+    public BinaryTupleSchema binaryTupleSchema() {
+        return binaryTupleSchema;
+    }
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
index 2919759202..6e615b3729 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.storage.index;
 
-import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.util.Cursor;
 import org.intellij.lang.annotations.MagicConstant;
@@ -74,8 +74,8 @@ public interface SortedIndexStorage {
      * @throws IllegalArgumentException If backwards flag is passed and backwards iteration is not supported by the storage.
      */
     Cursor<IndexRow> scan(
-            @Nullable BinaryTuple lowerBound,
-            @Nullable BinaryTuple upperBound,
+            @Nullable BinaryTuplePrefix lowerBound,
+            @Nullable BinaryTuplePrefix upperBound,
             @MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags
     );
 }
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
index 3aceb2da46..70a1983bf5 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
@@ -52,7 +52,7 @@ import org.apache.ignite.configuration.schemas.table.TableIndexView;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.schema.SchemaTestUtils;
 import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
 import org.apache.ignite.internal.schema.testutils.builder.SortedIndexDefinitionBuilder;
@@ -133,17 +133,12 @@ public abstract class AbstractSortedIndexStorageTest {
      *
      * <p>This method *MUST* always be called in either subclass' constructor or setUp method.
      */
-    protected final void initialize(TableConfiguration tableCfg, TablesConfiguration tablesCfg) {
+    protected final void initialize(MvTableStorage tableStorage, TablesConfiguration tablesCfg) {
         this.tablesCfg = tablesCfg;
-
-        createTestTable(tableCfg);
-    }
-
-    /** Set up internal storage implementation. */
-    protected final void initializeStorage(MvTableStorage tableStorage) {
         this.tableStorage = tableStorage;
-
         this.partitionStorage = tableStorage.getOrCreateMvPartition(TEST_PARTITION);
+
+        createTestTable(tableStorage.configuration());
     }
 
     /**
@@ -266,7 +261,7 @@ public abstract class AbstractSortedIndexStorageTest {
         put(index, row);
         put(index, row);
 
-        IndexRow actualRow = getSingle(index, row.indexColumns());
+        IndexRow actualRow = getSingle(index, serializer.serializeRowPrefix(columnValues));
 
         assertThat(actualRow.rowId(), is(equalTo(row.rowId())));
     }
@@ -402,8 +397,8 @@ public abstract class AbstractSortedIndexStorageTest {
                 .limit(lastIndex - firstIndex + 1)
                 .collect(toList());
 
-        BinaryTuple first = entries.get(firstIndex).prefix(3);
-        BinaryTuple last = entries.get(lastIndex).prefix(5);
+        BinaryTuplePrefix first = entries.get(firstIndex).prefix(3);
+        BinaryTuplePrefix last = entries.get(lastIndex).prefix(5);
 
         try (Cursor<IndexRow> cursor = indexStorage.scan(first, last, GREATER_OR_EQUAL | LESS_OR_EQUAL)) {
             List<IndexRow> actual = cursor.stream().collect(toList());
@@ -527,7 +522,7 @@ public abstract class AbstractSortedIndexStorageTest {
         put(indexStorage, entry1);
         put(indexStorage, entry2);
 
-        try (Cursor<IndexRow> cursor = indexStorage.scan(entry2.indexColumns(), entry1.indexColumns(), 0)) {
+        try (Cursor<IndexRow> cursor = indexStorage.scan(entry2.prefix(indexSchema.size()), entry1.prefix(indexSchema.size()), 0)) {
             assertThat(cursor.stream().collect(toList()), is(empty()));
         }
     }
@@ -556,10 +551,10 @@ public abstract class AbstractSortedIndexStorageTest {
             entry1 = t;
         }
 
-        try (Cursor<IndexRow> cursor = storage.scan(entry1.indexColumns(), entry2.indexColumns(), GREATER_OR_EQUAL | LESS_OR_EQUAL)) {
+        try (Cursor<IndexRow> cursor = storage.scan(entry1.prefix(1), entry2.prefix(1), GREATER_OR_EQUAL | LESS_OR_EQUAL)) {
             assertThat(
-                    cursor.stream().map(IndexRow::indexColumns).collect(toList()),
-                    contains(entry1.indexColumns(), entry2.indexColumns())
+                    cursor.stream().map(row -> row.indexColumns().byteBuffer()).collect(toList()),
+                    contains(entry1.indexColumns().byteBuffer(), entry2.indexColumns().byteBuffer())
             );
         }
     }
@@ -611,25 +606,25 @@ public abstract class AbstractSortedIndexStorageTest {
         put(indexStorage, entry2);
 
         assertThat(
-                getSingle(indexStorage, entry1.indexColumns()).rowId(),
+                getSingle(indexStorage, entry1.prefix(indexSchema.size())).rowId(),
                 is(equalTo(entry1.rowId()))
         );
 
         assertThat(
-                getSingle(indexStorage, entry2.indexColumns()).rowId(),
+                getSingle(indexStorage, entry2.prefix(indexSchema.size())).rowId(),
                 is(equalTo(entry2.rowId()))
         );
 
         remove(indexStorage, entry1);
 
-        assertThat(getSingle(indexStorage, entry1.indexColumns()), is(nullValue()));
+        assertThat(getSingle(indexStorage, entry1.prefix(indexSchema.size())), is(nullValue()));
     }
 
     /**
      * Extracts a single value by a given key or {@code null} if it does not exist.
      */
     @Nullable
-    private static IndexRow getSingle(SortedIndexStorage indexStorage, BinaryTuple fullPrefix) throws Exception {
+    private static IndexRow getSingle(SortedIndexStorage indexStorage, BinaryTuplePrefix fullPrefix) throws Exception {
         try (Cursor<IndexRow> cursor = indexStorage.scan(fullPrefix, fullPrefix, GREATER_OR_EQUAL | LESS_OR_EQUAL)) {
             List<IndexRow> values = cursor.stream().collect(toList());
 
@@ -639,7 +634,7 @@ public abstract class AbstractSortedIndexStorageTest {
         }
     }
 
-    private static BinaryTuple prefix(SortedIndexStorage index, Object... vals) {
+    private static BinaryTuplePrefix prefix(SortedIndexStorage index, Object... vals) {
         var serializer = new BinaryTupleRowSerializer(index.indexDescriptor());
 
         return serializer.serializeRowPrefix(vals);
@@ -647,8 +642,8 @@ public abstract class AbstractSortedIndexStorageTest {
 
     private static List<Object[]> scan(
             SortedIndexStorage index,
-            @Nullable BinaryTuple lowerBound,
-            @Nullable BinaryTuple upperBound,
+            @Nullable BinaryTuplePrefix lowerBound,
+            @Nullable BinaryTuplePrefix upperBound,
             @MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags
     ) throws Exception {
         var serializer = new BinaryTupleRowSerializer(index.indexDescriptor());
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/BinaryTupleComparatorTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/BinaryTupleComparatorTest.java
new file mode 100644
index 0000000000..14e70d71f5
--- /dev/null
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/BinaryTupleComparatorTest.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.BitSet;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for the {@link BinaryTupleComparator} class.
+ */
+public class BinaryTupleComparatorTest {
+
+    @ParameterizedTest
+    @MethodSource("allTypes")
+    public void testCompareSingleColumnTuples(NativeType type) {
+        var columnDescriptor = new ColumnDescriptor("column", type, false, true);
+
+        var descriptor = new SortedIndexDescriptor(UUID.randomUUID(), List.of(columnDescriptor));
+
+        var comparator = new BinaryTupleComparator(descriptor);
+
+        IgniteBiTuple<ByteBuffer, ByteBuffer> tuples = createTestValues(type);
+
+        assertThat(comparator.compare(tuples.get1(), tuples.get2()), is(lessThanOrEqualTo(-1)));
+        assertThat(comparator.compare(tuples.get1(), tuples.get1()), is(0));
+        assertThat(comparator.compare(tuples.get2(), tuples.get2()), is(0));
+        assertThat(comparator.compare(tuples.get2(), tuples.get1()), is(greaterThanOrEqualTo(1)));
+    }
+
+    private static List<NativeType> allTypes() {
+        return List.of(
+                NativeTypes.INT8,
+                NativeTypes.INT16,
+                NativeTypes.INT32,
+                NativeTypes.INT64,
+                NativeTypes.FLOAT,
+                NativeTypes.DOUBLE,
+                NativeTypes.BYTES,
+                NativeTypes.bitmaskOf(42),
+                NativeTypes.decimalOf(20, 3),
+                NativeTypes.UUID,
+                NativeTypes.STRING,
+                NativeTypes.numberOf(20),
+                NativeTypes.timestamp(),
+                NativeTypes.DATE,
+                NativeTypes.datetime()
+        );
+    }
+
+    private static IgniteBiTuple<ByteBuffer, ByteBuffer> createTestValues(NativeType type) {
+        ByteBuffer tuple1;
+        ByteBuffer tuple2;
+
+        switch (type.spec()) {
+            case INT8: {
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendByte((byte) -1)
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendByte(Byte.MAX_VALUE)
+                        .build();
+
+                break;
+            }
+
+            case INT16: {
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendShort((short) -1)
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendShort(Short.MAX_VALUE)
+                        .build();
+
+                break;
+            }
+
+            case INT32: {
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendInt(-1)
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendInt(Integer.MAX_VALUE)
+                        .build();
+
+                break;
+            }
+
+            case INT64: {
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendLong(-1)
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendLong(Long.MAX_VALUE)
+                        .build();
+
+                break;
+            }
+
+            case FLOAT: {
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendFloat(-1.69f)
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendFloat(Float.MAX_VALUE)
+                        .build();
+
+                break;
+            }
+
+            case DOUBLE: {
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendDouble(-1.69)
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendDouble(Double.MAX_VALUE)
+                        .build();
+
+                break;
+            }
+
+            case BYTES: {
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendBytes(new byte[] {1, 2, 3})
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendBytes(new byte[] {1, 2, 6})
+                        .build();
+
+                break;
+            }
+
+            case BITMASK: {
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendBitmask(BitSet.valueOf(new byte[] { 1, 2, 3 }))
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendBitmask(BitSet.valueOf(new byte[] {-1, -1, -1}))
+                        .build();
+
+                break;
+            }
+
+            case DECIMAL: {
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendDecimal(BigDecimal.valueOf(-1), 4)
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendDecimal(BigDecimal.valueOf(123456789.1234), 4)
+                        .build();
+
+                break;
+            }
+
+            case UUID: {
+                UUID uuid1 = UUID.randomUUID();
+                UUID uuid2 = UUID.randomUUID();
+
+                if (uuid1.compareTo(uuid2) > 0) {
+                    UUID t = uuid1;
+                    uuid1 = uuid2;
+                    uuid2 = t;
+                }
+
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendUuid(uuid1)
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendUuid(uuid2)
+                        .build();
+
+                break;
+            }
+
+            case STRING: {
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendString("foobar")
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendString("foobaz")
+                        .build();
+
+                break;
+            }
+
+            case NUMBER: {
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendNumber(BigInteger.valueOf(-1))
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendNumber(BigInteger.TEN)
+                        .build();
+
+                break;
+            }
+
+            case TIMESTAMP: {
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendTimestamp(Instant.ofEpochSecond(1))
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendTimestamp(Instant.ofEpochSecond(42))
+                        .build();
+
+                break;
+            }
+
+            case DATE: {
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendDate(LocalDate.of(2000, 4, 10))
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendDate(LocalDate.of(2000, 4, 15))
+                        .build();
+
+                break;
+            }
+
+            case TIME: {
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendTime(LocalTime.of(10, 0, 0))
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendTime(LocalTime.of(10, 0, 1))
+                        .build();
+
+                break;
+            }
+
+            case DATETIME: {
+                tuple1 = new BinaryTupleBuilder(1, false)
+                        .appendDateTime(LocalDateTime.of(2000, 4, 10, 10, 0, 0))
+                        .build();
+
+                tuple2 = new BinaryTupleBuilder(1, false)
+                        .appendDateTime(LocalDateTime.of(2000, 4, 10, 10, 0, 1))
+                        .build();
+
+                break;
+            }
+
+            default:
+                throw new AssertionError(type.toString());
+        }
+
+        return new IgniteBiTuple<>(tuple1, tuple2);
+    }
+
+    @Test
+    public void testCompareMultipleColumnTuples() {
+        List<ColumnDescriptor> columnDescriptors = List.of(
+                new ColumnDescriptor("column", NativeTypes.INT32, false, true),
+                new ColumnDescriptor("column", NativeTypes.STRING, false,  false)
+        );
+
+        var descriptor = new SortedIndexDescriptor(UUID.randomUUID(), columnDescriptors);
+
+        var comparator = new BinaryTupleComparator(descriptor);
+
+        ByteBuffer tuple1 = new BinaryTupleBuilder(2, false)
+                .appendInt(0)
+                .appendString("foobar")
+                .build();
+
+        ByteBuffer tuple2 = new BinaryTupleBuilder(2, false)
+                .appendInt(1)
+                .appendString("foobar")
+                .build();
+
+        assertThat(comparator.compare(tuple1, tuple2), is(lessThanOrEqualTo(-1)));
+        assertThat(comparator.compare(tuple1, tuple1), is(0));
+        assertThat(comparator.compare(tuple2, tuple1), is(greaterThanOrEqualTo(1)));
+
+        tuple2 = new BinaryTupleBuilder(2, false)
+                .appendInt(0)
+                .appendString("foobaa")
+                .build();
+
+        assertThat(comparator.compare(tuple1, tuple2), is(lessThanOrEqualTo(-1)));
+        assertThat(comparator.compare(tuple2, tuple1), is(greaterThanOrEqualTo(1)));
+    }
+
+    @Test
+    public void testCompareMultipleColumnTuplesWithNulls() {
+        List<ColumnDescriptor> columnDescriptors = List.of(
+                new ColumnDescriptor("column", NativeTypes.INT32, true, true),
+                new ColumnDescriptor("column", NativeTypes.STRING, true,  false)
+        );
+
+        var descriptor = new SortedIndexDescriptor(UUID.randomUUID(), columnDescriptors);
+
+        var comparator = new BinaryTupleComparator(descriptor);
+
+        ByteBuffer tuple1 = new BinaryTupleBuilder(2, true)
+                .appendInt(null)
+                .appendString("foobar")
+                .build();
+
+        ByteBuffer tuple2 = new BinaryTupleBuilder(2, true)
+                .appendInt(1)
+                .appendString("foobar")
+                .build();
+
+        assertThat(comparator.compare(tuple1, tuple2), is(lessThanOrEqualTo(-1)));
+        assertThat(comparator.compare(tuple1, tuple1), is(0));
+        assertThat(comparator.compare(tuple2, tuple1), is(greaterThanOrEqualTo(1)));
+
+        tuple2 = new BinaryTupleBuilder(2, true)
+                .appendInt(null)
+                .appendString("foobaa")
+                .build();
+
+        assertThat(comparator.compare(tuple1, tuple2), is(lessThanOrEqualTo(-1)));
+        assertThat(comparator.compare(tuple2, tuple1), is(greaterThanOrEqualTo(1)));
+
+        tuple2 = new BinaryTupleBuilder(2, true)
+                .appendInt(null)
+                .appendString(null)
+                .build();
+
+        assertThat(comparator.compare(tuple1, tuple2), is(lessThanOrEqualTo(-1)));
+        assertThat(comparator.compare(tuple2, tuple1), is(greaterThanOrEqualTo(1)));
+        assertThat(comparator.compare(tuple2, tuple2), is(0));
+    }
+
+    @Test
+    public void testCompareWithPrefix() {
+        List<ColumnDescriptor> columnDescriptors = List.of(
+                new ColumnDescriptor("column", NativeTypes.INT32, false, true),
+                new ColumnDescriptor("column", NativeTypes.STRING, false,  false)
+        );
+
+        var descriptor = new SortedIndexDescriptor(UUID.randomUUID(), columnDescriptors);
+
+        var comparator = new BinaryTupleComparator(descriptor);
+
+        ByteBuffer tuple1 = new BinaryTupleBuilder(2, false)
+                .appendInt(1)
+                .appendString("foobar")
+                .build();
+
+        ByteBuffer tuple2 = new BinaryTuplePrefixBuilder(1, 2)
+                .appendInt(2)
+                .build();
+
+        assertThat(comparator.compare(tuple1, tuple2), is(lessThanOrEqualTo(-1)));
+        assertThat(comparator.compare(tuple2, tuple1), is(greaterThanOrEqualTo(1)));
+
+        tuple2 = new BinaryTuplePrefixBuilder(1, 2)
+                .appendInt(0)
+                .build();
+
+        assertThat(comparator.compare(tuple2, tuple1), is(lessThanOrEqualTo(-1)));
+        assertThat(comparator.compare(tuple1, tuple2), is(greaterThanOrEqualTo(1)));
+
+        tuple2 = new BinaryTuplePrefixBuilder(1, 2)
+                .appendInt(1)
+                .build();
+
+        assertThat(comparator.compare(tuple1, tuple2), is(0));
+    }
+
+    @Test
+    public void testCompareWithPrefixWithNulls() {
+        List<ColumnDescriptor> columnDescriptors = List.of(
+                new ColumnDescriptor("column", NativeTypes.INT32, true, true),
+                new ColumnDescriptor("column", NativeTypes.STRING, false,  false)
+        );
+
+        var descriptor = new SortedIndexDescriptor(UUID.randomUUID(), columnDescriptors);
+
+        var comparator = new BinaryTupleComparator(descriptor);
+
+        ByteBuffer tuple1 = new BinaryTupleBuilder(2, true)
+                .appendInt(null)
+                .appendString("foobar")
+                .build();
+
+        ByteBuffer tuple2 = new BinaryTuplePrefixBuilder(1, 2)
+                .appendInt(0)
+                .build();
+
+        assertThat(comparator.compare(tuple1, tuple2), is(lessThanOrEqualTo(-1)));
+        assertThat(comparator.compare(tuple2, tuple1), is(greaterThanOrEqualTo(1)));
+
+        tuple2 = new BinaryTuplePrefixBuilder(1, 2)
+                .appendInt(null)
+                .build();
+
+        assertThat(comparator.compare(tuple1, tuple2), is(0));
+    }
+}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java
index 46589ded86..fcb2b3a4bb 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java
@@ -66,8 +66,8 @@ public class TestSortedIndexStorageTest extends AbstractSortedIndexStorageTest {
                     EntryCountBudgetConfigurationSchema.class
             })
             TablesConfiguration tablesConfig) {
-        initialize(tableCfg, tablesConfig);
+        var storage = new TestConcurrentHashMapMvTableStorage(tableCfg, tablesConfig);
 
-        initializeStorage(new TestConcurrentHashMapMvTableStorage(tableCfg, tablesConfig));
+        initialize(storage, tablesConfig);
     }
 }
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java
index 530db7c811..6c5951e6b2 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/impl/TestIndexRow.java
@@ -29,6 +29,7 @@ import java.util.Random;
 import java.util.function.Function;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.schema.SchemaTestUtils;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.index.IndexRow;
@@ -81,7 +82,7 @@ public class TestIndexRow implements IndexRow, Comparable<TestIndexRow> {
     /**
      * Creates an Index Key prefix of the given length.
      */
-    public BinaryTuple prefix(int length) {
+    public BinaryTuplePrefix prefix(int length) {
         return serializer.serializeRowPrefix(Arrays.copyOf(columns, length));
     }
 
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index e8deddc46f..2642701dee 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -191,13 +191,17 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
      */
     @Test
     public void testDestroyIndex() {
-        tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
 
         assertThat(tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()), is(notNullValue()));
         assertThat(tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()), is(notNullValue()));
 
-        assertThat(tableStorage.destroyIndex(sortedIdx.id()), willCompleteSuccessfully());
-        assertThat(tableStorage.destroyIndex(hashIdx.id()), willCompleteSuccessfully());
+        CompletableFuture<Void> destroySortedIndexFuture = tableStorage.destroyIndex(sortedIdx.id());
+        CompletableFuture<Void> destroyHashIndexFuture = tableStorage.destroyIndex(hashIdx.id());
+
+        assertThat(partitionStorage.flush(), willCompleteSuccessfully());
+        assertThat(destroySortedIndexFuture, willCompleteSuccessfully());
+        assertThat(destroyHashIndexFuture, willCompleteSuccessfully());
     }
 
     @Test
@@ -223,7 +227,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
                 new Element(NativeTypes.INT32, false)
         });
 
-        ByteBuffer buffer = BinaryTupleBuilder.create(schema.elementCount(), schema.hasNullableElements())
+        ByteBuffer buffer = new BinaryTupleBuilder(schema.elementCount(), schema.hasNullableElements())
                 .appendInt(1)
                 .appendInt(2)
                 .build();
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
index e248511584..09d966abba 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
@@ -64,7 +64,12 @@ public abstract class AbstractHashIndexStorageTest {
 
     private BinaryTupleRowSerializer serializer;
 
-    protected void initialize(MvTableStorage tableStorage, TablesConfiguration tablesCfg) {
+    /**
+     * Initializes the internal structures needed for tests.
+     *
+     * <p>This method *MUST* always be called in either subclass' constructor or setUp method.
+     */
+    protected final void initialize(MvTableStorage tableStorage, TablesConfiguration tablesCfg) {
         this.tableStorage = tableStorage;
 
         createTestTable(tableStorage.configuration());
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java
index 798d70bd22..9be9372135 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java
@@ -29,7 +29,9 @@ import java.util.BitSet;
 import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
 import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
 import org.apache.ignite.internal.schema.InvalidTypeException;
@@ -39,6 +41,7 @@ import org.apache.ignite.internal.schema.SchemaMismatchException;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.storage.index.IndexRowImpl;
 import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
 
 /**
@@ -58,22 +61,34 @@ public class BinaryTupleRowSerializer {
 
     private final List<ColumnDescriptor> schema;
 
+    private final BinaryTupleSchema tupleSchema;
+
     /**
      * Creates a new instance for a Sorted Index.
      */
     public BinaryTupleRowSerializer(SortedIndexDescriptor descriptor) {
-        this.schema = descriptor.indexColumns().stream()
+        this(descriptor.indexColumns().stream()
                 .map(colDesc -> new ColumnDescriptor(colDesc.type(), colDesc.nullable()))
-                .collect(toUnmodifiableList());
+                .collect(toUnmodifiableList()));
     }
 
     /**
      * Creates a new instance for a Hash Index.
      */
     public BinaryTupleRowSerializer(HashIndexDescriptor descriptor) {
-        this.schema = descriptor.indexColumns().stream()
+        this(descriptor.indexColumns().stream()
                 .map(colDesc -> new ColumnDescriptor(colDesc.type(), colDesc.nullable()))
-                .collect(toUnmodifiableList());
+                .collect(toUnmodifiableList()));
+    }
+
+    private BinaryTupleRowSerializer(List<ColumnDescriptor> schema) {
+        this.schema = schema;
+
+        Element[] elements = schema.stream()
+                .map(columnDescriptor -> new Element(columnDescriptor.type, columnDescriptor.nullable))
+                .toArray(Element[]::new);
+
+        tupleSchema = BinaryTupleSchema.create(elements);
     }
 
     /**
@@ -88,13 +103,21 @@ public class BinaryTupleRowSerializer {
             ));
         }
 
-        return new IndexRowImpl(serializeRowPrefix(columnValues), rowId);
+        var builder = new BinaryTupleBuilder(tupleSchema.elementCount(), tupleSchema.hasNullableElements());
+
+        for (Object value : columnValues) {
+            appendValue(builder, value);
+        }
+
+        var tuple = new BinaryTuple(tupleSchema, builder.build());
+
+        return new IndexRowImpl(tuple, rowId);
     }
 
     /**
      * Creates a prefix of an {@link IndexRow} using the provided columns.
      */
-    public BinaryTuple serializeRowPrefix(Object[] prefixColumnValues) {
+    public BinaryTuplePrefix serializeRowPrefix(Object[] prefixColumnValues) {
         if (prefixColumnValues.length > schema.size()) {
             throw new IllegalArgumentException(String.format(
                     "Incorrect number of column values passed. Expected not more than %d, got %d",
@@ -103,21 +126,13 @@ public class BinaryTupleRowSerializer {
             ));
         }
 
-        Element[] prefixElements = schema.stream()
-                .limit(prefixColumnValues.length)
-                .map(columnDescriptor -> new Element(columnDescriptor.type, columnDescriptor.nullable))
-                .toArray(Element[]::new);
-
-        BinaryTupleSchema prefixSchema = BinaryTupleSchema.create(prefixElements);
-
-        BinaryTupleBuilder builder = BinaryTupleBuilder.create(
-                prefixSchema.elementCount(), prefixSchema.hasNullableElements());
+        var builder = new BinaryTuplePrefixBuilder(prefixColumnValues.length, schema.size());
 
         for (Object value : prefixColumnValues) {
-            appendValue(builder, prefixSchema, value);
+            appendValue(builder, value);
         }
 
-        return new BinaryTuple(prefixSchema, builder.build());
+        return new BinaryTuplePrefix(tupleSchema, builder.build());
     }
 
     /**
@@ -143,12 +158,11 @@ public class BinaryTupleRowSerializer {
      * Append a value for the current element.
      *
      * @param builder Builder.
-     * @param schema Tuple schema.
      * @param value Element value.
      * @return Builder for chaining.
      */
-    private static BinaryTupleBuilder appendValue(BinaryTupleBuilder builder, BinaryTupleSchema schema, Object value) {
-        Element element = schema.element(builder.elementIndex());
+    private BinaryTupleBuilder appendValue(BinaryTupleBuilder builder, Object value) {
+        Element element = tupleSchema.element(builder.elementIndex());
 
         if (value == null) {
             if (!element.nullable()) {
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/IndexRowImpl.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/IndexRowImpl.java
deleted file mode 100644
index db8be5b72a..0000000000
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/IndexRowImpl.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.index.impl;
-
-import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.index.IndexRow;
-
-/**
- * {@link IndexRow} implementation that simply stores the provided parameters.
- */
-class IndexRowImpl implements IndexRow {
-    private final BinaryTuple indexColumns;
-
-    private final RowId rowId;
-
-    IndexRowImpl(BinaryTuple indexColumns, RowId rowId) {
-        this.indexColumns = indexColumns;
-        this.rowId = rowId;
-    }
-
-    @Override
-    public BinaryTuple indexColumns() {
-        return indexColumns;
-    }
-
-    @Override
-    public RowId rowId() {
-        return rowId;
-    }
-}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index d054204b5a..f0c45d67f2 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -19,16 +19,22 @@ package org.apache.ignite.internal.storage.index.impl;
 
 import static org.apache.ignite.internal.util.IgniteUtils.capacity;
 
+import java.nio.ByteBuffer;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.NavigableMap;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.function.ToIntFunction;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
 import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.storage.index.IndexRowImpl;
 import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.util.Cursor;
@@ -38,7 +44,9 @@ import org.jetbrains.annotations.Nullable;
  * Test implementation of MV sorted index storage.
  */
 public class TestSortedIndexStorage implements SortedIndexStorage {
-    private final ConcurrentNavigableMap<BinaryTuple, Set<RowId>> index;
+    private final ConcurrentNavigableMap<ByteBuffer, Set<RowId>> index;
+
+    private final Comparator<ByteBuffer> binaryTupleComparator;
 
     private final SortedIndexDescriptor descriptor;
 
@@ -47,7 +55,8 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
      */
     public TestSortedIndexStorage(SortedIndexDescriptor descriptor) {
         this.descriptor = descriptor;
-        this.index = new ConcurrentSkipListMap<>(BinaryTupleComparator.newComparator(descriptor));
+        this.binaryTupleComparator = new BinaryTupleComparator(descriptor);
+        this.index = new ConcurrentSkipListMap<>(binaryTupleComparator);
     }
 
     @Override
@@ -57,7 +66,7 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
 
     @Override
     public void put(IndexRow row) {
-        index.compute(row.indexColumns(), (k, v) -> {
+        index.compute(row.indexColumns().byteBuffer(), (k, v) -> {
             if (v == null) {
                 return Set.of(row.rowId());
             } else if (v.contains(row.rowId())) {
@@ -75,7 +84,7 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
 
     @Override
     public void remove(IndexRow row) {
-        index.computeIfPresent(row.indexColumns(), (k, v) -> {
+        index.computeIfPresent(row.indexColumns().byteBuffer(), (k, v) -> {
             if (v.contains(row.rowId())) {
                 if (v.size() == 1) {
                     return null;
@@ -95,31 +104,45 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
     /** {@inheritDoc} */
     @Override
     public Cursor<IndexRow> scan(
-            @Nullable BinaryTuple lowerBound,
-            @Nullable BinaryTuple upperBound,
+            @Nullable BinaryTuplePrefix lowerBound,
+            @Nullable BinaryTuplePrefix upperBound,
             int flags
     ) {
         boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
         boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
 
-        NavigableMap<BinaryTuple, Set<RowId>> index = this.index;
-        int direction = 1;
+        Stream<Map.Entry<ByteBuffer, Set<RowId>>> data = index.entrySet().stream();
+
+        if (lowerBound != null) {
+            ToIntFunction<ByteBuffer> lowerCmp = boundComparator(lowerBound, includeLower ? 0 : -1);
+
+            data = data.dropWhile(e -> lowerCmp.applyAsInt(e.getKey()) < 0);
+        }
 
-        ToIntFunction<BinaryTuple> lowerCmp = lowerBound == null ? row -> 1 : boundComparator(lowerBound, direction, includeLower ? 0 : -1);
-        ToIntFunction<BinaryTuple> upperCmp = upperBound == null ? row -> -1 : boundComparator(upperBound, direction, includeUpper ? 0 : 1);
+        if (upperBound != null) {
+            ToIntFunction<ByteBuffer> upperCmp = boundComparator(upperBound, includeUpper ? 0 : 1);
 
-        Iterator<? extends IndexRow> iterator = index.entrySet().stream()
-                .dropWhile(e -> lowerCmp.applyAsInt(e.getKey()) < 0)
-                .takeWhile(e -> upperCmp.applyAsInt(e.getKey()) <= 0)
-                .flatMap(e -> e.getValue().stream().map(rowId -> new IndexRowImpl(e.getKey(), rowId)))
+            data = data.takeWhile(e -> upperCmp.applyAsInt(e.getKey()) <= 0);
+        }
+
+        Iterator<? extends IndexRow> iterator = data
+                .flatMap(e -> {
+                    var tuple = new BinaryTuple(descriptor.binaryTupleSchema(), e.getKey());
+
+                    return e.getValue().stream().map(rowId -> new IndexRowImpl(tuple, rowId));
+                })
                 .iterator();
 
         return Cursor.fromIterator(iterator);
     }
 
-    private ToIntFunction<BinaryTuple> boundComparator(BinaryTuple bound, int direction, int equals) {
-        BinaryTupleComparator comparator = BinaryTupleComparator.newPrefixComparator(descriptor, bound.count());
+    private ToIntFunction<ByteBuffer> boundComparator(BinaryTuplePrefix bound, int equals) {
+        ByteBuffer boundBuffer = bound.byteBuffer();
+
+        return tuple -> {
+            int compare = binaryTupleComparator.compare(tuple, boundBuffer);
 
-        return tuple -> comparator.compare(tuple, bound, direction, equals);
+            return compare == 0 ? equals : compare;
+        };
     }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
index 6a4e33c0e1..c49a1ebe49 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.storage.rocksdb;
 
 import java.nio.charset.StandardCharsets;
+import java.util.UUID;
 import org.rocksdb.RocksDB;
 
 /**
@@ -39,11 +40,16 @@ class ColumnFamilyUtils {
      */
     static final String HASH_INDEX_CF_NAME = "cf-hash";
 
+    /**
+     * Prefix for SQL indexes column family names.
+     */
+    static final String SORTED_INDEX_CF_PREFIX = "cf-sorted-";
+
     /**
      * Utility enum to describe a type of the column family - meta or partition.
      */
     enum ColumnFamilyType {
-        META, PARTITION, HASH_INDEX, UNKNOWN;
+        META, PARTITION, HASH_INDEX, SORTED_INDEX, UNKNOWN;
 
         /**
          * Determines column family type by its name.
@@ -58,9 +64,35 @@ class ColumnFamilyUtils {
                 return PARTITION;
             } else if (HASH_INDEX_CF_NAME.equals(cfName)) {
                 return HASH_INDEX;
+            } else if (cfName.startsWith(SORTED_INDEX_CF_PREFIX)) {
+                return SORTED_INDEX;
             } else {
                 return UNKNOWN;
             }
         }
     }
+
+    /**
+     * Creates a column family name by index ID.
+     *
+     * @param indexId Index ID.
+     * @return Column family name.
+     *
+     * @see #sortedIndexId
+     */
+    static String sortedIndexCfName(UUID indexId) {
+        return SORTED_INDEX_CF_PREFIX + indexId;
+    }
+
+    /**
+     * Extracts a Sorted Index ID from the given Column Family name.
+     *
+     * @param cfName Column Family name.
+     * @return Sorted Index ID.
+     *
+     * @see #sortedIndexCfName
+     */
+    static UUID sortedIndexId(String cfName) {
+        return UUID.fromString(cfName.substring(SORTED_INDEX_CF_PREFIX.length()));
+    }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndexes.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
similarity index 77%
rename from modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndexes.java
rename to modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
index ea6294f6cb..5a61b7c992 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndexes.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
@@ -24,22 +24,34 @@ import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
 
-class HashIndexes {
+/**
+ * Class that represents a Hash Index defined for all partitions of a Table.
+ */
+class HashIndex {
+    private final ColumnFamily indexCf;
+
     private final HashIndexDescriptor descriptor;
 
     private final ConcurrentMap<Integer, HashIndexStorage> storages = new ConcurrentHashMap<>();
 
-    HashIndexes(HashIndexDescriptor descriptor) {
+    HashIndex(ColumnFamily indexCf, HashIndexDescriptor descriptor) {
+        this.indexCf = indexCf;
         this.descriptor = descriptor;
     }
 
-    HashIndexStorage getOrCreateStorage(ColumnFamily indexCf, RocksDbMvPartitionStorage partitionStorage) {
+    /**
+     * Creates a new Hash Index storage or returns an existing one.
+     */
+    HashIndexStorage getOrCreateStorage(RocksDbMvPartitionStorage partitionStorage) {
         return storages.computeIfAbsent(
                 partitionStorage.partitionId(),
                 partId -> new RocksDbHashIndexStorage(descriptor, indexCf, partitionStorage)
         );
     }
 
+    /**
+     * Removes all data associated with the index.
+     */
     void destroy() {
         storages.forEach((partitionId, storage) -> storage.destroy());
     }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
index 370298ca9c..fd3857e348 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
@@ -44,7 +44,7 @@ class RocksDbMetaStorage {
     /**
      * Name of the key that is out of range of the partition ID key prefix, used as an exclusive bound.
      */
-    private static final byte[] PARTITION_ID_PREFIX_END = RocksUtils.rangeEnd(PARTITION_ID_PREFIX);
+    private static final byte[] PARTITION_ID_PREFIX_END = RocksUtils.incrementArray(PARTITION_ID_PREFIX);
 
     private final ColumnFamily metaColumnFamily;
 
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index bded4516ee..65bae91ba3 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -21,6 +21,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.HASH_INDEX_CF_NAME;
 import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.META_CF_NAME;
 import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.PARTITION_CF_NAME;
+import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
+import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexId;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -47,8 +49,10 @@ import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.ColumnFamilyType;
+import org.apache.ignite.internal.storage.rocksdb.index.RocksDbBinaryTupleComparator;
 import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -108,10 +112,13 @@ public class RocksDbTableStorage implements MvTableStorage {
     private volatile AtomicReferenceArray<RocksDbMvPartitionStorage> partitions;
 
     /** Hash Index storages by Index IDs. */
-    private final ConcurrentMap<UUID, HashIndexes> hashIndices = new ConcurrentHashMap<>();
+    private final ConcurrentMap<UUID, HashIndex> hashIndices = new ConcurrentHashMap<>();
+
+    /** Sorted Index storages by Index IDs. */
+    private final ConcurrentMap<UUID, SortedIndex> sortedIndices = new ConcurrentHashMap<>();
 
     /** Busy lock to stop synchronously. */
-    final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
     /** Prevents double stopping the component. */
     private final AtomicBoolean stopGuard = new AtomicBoolean();
@@ -224,6 +231,15 @@ public class RocksDbTableStorage implements MvTableStorage {
 
                         break;
 
+                    case SORTED_INDEX:
+                        UUID indexId = sortedIndexId(cf.name());
+
+                        var indexDescriptor = new SortedIndexDescriptor(indexId, tableCfg.value(), tablesCfg.value());
+
+                        sortedIndices.put(indexId, new SortedIndex(cf, indexDescriptor));
+
+                        break;
+
                     default:
                         throw new StorageException("Unidentified column family [name=" + cf.name() + ", table="
                                 + tableCfg.value().name() + ']');
@@ -301,6 +317,7 @@ public class RocksDbTableStorage implements MvTableStorage {
         resources.add(meta.columnFamily().handle());
         resources.add(partitionCf.handle());
         resources.add(hashIndexCf.handle());
+        resources.addAll(sortedIndices.values());
 
         resources.add(db);
 
@@ -385,38 +402,71 @@ public class RocksDbTableStorage implements MvTableStorage {
     /** {@inheritDoc} */
     @Override
     public SortedIndexStorage getOrCreateSortedIndex(int partitionId, UUID indexId) {
-        throw new UnsupportedOperationException("Not implemented yet");
+        SortedIndex storages = sortedIndices.computeIfAbsent(indexId, this::createSortedIndex);
+
+        RocksDbMvPartitionStorage partitionStorage = getMvPartition(partitionId);
+
+        if (partitionStorage == null) {
+            throw new StorageException(String.format("Partition ID %d does not exist", partitionId));
+        }
+
+        return storages.getOrCreateStorage(partitionStorage);
+    }
+
+    private SortedIndex createSortedIndex(UUID indexId) {
+        var indexDescriptor = new SortedIndexDescriptor(indexId, tableCfg.value(), tablesCfg.value());
+
+        ColumnFamilyDescriptor cfDescriptor = sortedIndexCfDescriptor(sortedIndexCfName(indexId), indexDescriptor);
+
+        ColumnFamily columnFamily;
+        try {
+            columnFamily = ColumnFamily.create(db, cfDescriptor);
+        } catch (RocksDBException e) {
+            throw new StorageException("Failed to create new RocksDB column family: " + new String(cfDescriptor.getName(), UTF_8), e);
+        }
+
+        flusher.addColumnFamily(columnFamily.handle());
+
+        return new SortedIndex(columnFamily, indexDescriptor);
     }
 
     @Override
     public HashIndexStorage getOrCreateHashIndex(int partitionId, UUID indexId) {
-        HashIndexes storages = hashIndices.computeIfAbsent(indexId, id -> {
-            var indexDescriptor = new HashIndexDescriptor(id, tableCfg.value(), tablesCfg.value());
+        HashIndex storages = hashIndices.computeIfAbsent(indexId, id -> {
+            var indexDescriptor = new HashIndexDescriptor(indexId, tableCfg.value(), tablesCfg.value());
 
-            return new HashIndexes(indexDescriptor);
+            return new HashIndex(hashIndexCf, indexDescriptor);
         });
 
         RocksDbMvPartitionStorage partitionStorage = getMvPartition(partitionId);
 
         if (partitionStorage == null) {
-            throw new StorageException(String.format("Partition %d has not been created yet", partitionId));
+            throw new StorageException(String.format("Partition ID %d does not exist", partitionId));
         }
 
-        return storages.getOrCreateStorage(hashIndexCf, partitionStorage);
+        return storages.getOrCreateStorage(partitionStorage);
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> destroyIndex(UUID indexId) {
-        HashIndexes storages = hashIndices.remove(indexId);
+        HashIndex hashIdx = hashIndices.remove(indexId);
 
-        if (storages == null) {
-            return CompletableFuture.completedFuture(null);
+        if (hashIdx != null) {
+            hashIdx.destroy();
         }
 
-        storages.destroy();
+        SortedIndex sortedIdx = sortedIndices.remove(indexId);
 
-        return awaitFlush(false);
+        if (sortedIdx != null) {
+            sortedIdx.destroy();
+        }
+
+        if (hashIdx == null && sortedIdx == null) {
+            return CompletableFuture.completedFuture(null);
+        } else {
+            return awaitFlush(false);
+        }
     }
 
     /** {@inheritDoc} */
@@ -495,8 +545,24 @@ public class RocksDbTableStorage implements MvTableStorage {
                         new ColumnFamilyOptions().useFixedLengthPrefixExtractor(RocksDbHashIndexStorage.FIXED_PREFIX_LENGTH)
                 );
 
+            case SORTED_INDEX:
+                var indexDescriptor = new SortedIndexDescriptor(sortedIndexId(cfName), tableCfg.value(), tablesCfg.value());
+
+                return sortedIndexCfDescriptor(cfName, indexDescriptor);
+
             default:
                 throw new StorageException("Unidentified column family [name=" + cfName + ", table=" + tableCfg.value().name() + ']');
         }
     }
+
+    /**
+     * Creates a Column Family descriptor for a Sorted Index.
+     */
+    private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, SortedIndexDescriptor descriptor) {
+        var comparator = new RocksDbBinaryTupleComparator(descriptor);
+
+        ColumnFamilyOptions options = new ColumnFamilyOptions().setComparator(comparator);
+
+        return new ColumnFamilyDescriptor(cfName.getBytes(UTF_8), options);
+    }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
new file mode 100644
index 0000000000..f78447374e
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
+import org.rocksdb.RocksDBException;
+
+/**
+ * Class that represents a Sorted Index defined for all partitions of a Table.
+ */
+class SortedIndex implements AutoCloseable {
+    private final SortedIndexDescriptor descriptor;
+
+    private final ColumnFamily indexCf;
+
+    private final ConcurrentMap<Integer, SortedIndexStorage> storages = new ConcurrentHashMap<>();
+
+    SortedIndex(ColumnFamily indexCf, SortedIndexDescriptor descriptor) {
+        this.descriptor = descriptor;
+        this.indexCf = indexCf;
+    }
+
+    /**
+     * Creates a new Sorted Index storage or returns an existing one.
+     */
+    SortedIndexStorage getOrCreateStorage(RocksDbMvPartitionStorage partitionStorage) {
+        return storages.computeIfAbsent(
+                partitionStorage.partitionId(),
+                partId -> new RocksDbSortedIndexStorage(descriptor, indexCf, partitionStorage)
+        );
+    }
+
+    /**
+     * Removes all data associated with the index.
+     */
+    void destroy() {
+        try {
+            indexCf.destroy();
+        } catch (RocksDBException e) {
+            throw new StorageException("Unable to destroy index " + descriptor.id(), e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        indexCf.handle().close();
+    }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/CursorUtils.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/CursorUtils.java
new file mode 100644
index 0000000000..050023d63d
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/CursorUtils.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Utility class for working with cursors.
+ */
+class CursorUtils {
+    /**
+     * Cursor wrapper that discards elements while they match a given predicate. As soon as any element does not match the predicate,
+     * no more elements will be discarded.
+     *
+     * @param <T> Cursor element type.
+     */
+    private static class DropWhileCursor<T> implements Cursor<T> {
+        private final Cursor<T> cursor;
+
+        @Nullable
+        private Predicate<T> predicate;
+
+        @Nullable
+        private T firstNotMatchedElement;
+
+        DropWhileCursor(Cursor<T> cursor, Predicate<T> predicate) {
+            this.cursor = cursor;
+            this.predicate = predicate;
+        }
+
+        @Override
+        public void close() throws Exception {
+            cursor.close();
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (predicate == null) {
+                return firstNotMatchedElement != null || cursor.hasNext();
+            }
+
+            while (cursor.hasNext()) {
+                firstNotMatchedElement = cursor.next();
+
+                if (!predicate.test(firstNotMatchedElement)) {
+                    predicate = null;
+
+                    break;
+                }
+            }
+
+            return predicate == null;
+        }
+
+        @Override
+        public T next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+
+            if (firstNotMatchedElement != null) {
+                T next = firstNotMatchedElement;
+
+                firstNotMatchedElement = null;
+
+                return next;
+            } else {
+                return cursor.next();
+            }
+        }
+    }
+
+    /**
+     * Creates a cursor wrapper that discards elements while they match a given predicate. As soon as any element does not match the
+     * predicate, no more elements will be discarded.
+     *
+     * @param cursor Underlying cursor with data.
+     * @param predicate Predicate for elements to be discarded.
+     * @param <T> Cursor element type.
+     * @return Cursor wrapper.
+     */
+    static <T> Cursor<T> dropWhile(Cursor<T> cursor, Predicate<T> predicate) {
+        return new DropWhileCursor<>(cursor, predicate);
+    }
+
+    /**
+     * Cursor wrapper that discards elements if they don't match a given predicate. As soon as any element does not match the predicate,
+     * all following elements will be discarded.
+     *
+     * @param <T> Cursor element type.
+     */
+    private static class TakeWhileCursor<T> implements Cursor<T> {
+        private final Cursor<T> cursor;
+
+        @Nullable
+        private Predicate<T> predicate;
+
+        @Nullable
+        private T next;
+
+        TakeWhileCursor(Cursor<T> cursor, Predicate<T> predicate) {
+            this.cursor = cursor;
+            this.predicate = predicate;
+        }
+
+        @Override
+        public void close() throws Exception {
+            cursor.close();
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (next != null) {
+                return true;
+            } else if (predicate == null || !cursor.hasNext()) {
+                return false;
+            }
+
+            next = cursor.next();
+
+            if (predicate.test(next)) {
+                return true;
+            } else {
+                predicate = null;
+                next = null;
+
+                return false;
+            }
+        }
+
+        @Override
+        public T next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+
+            T result = next;
+
+            next = null;
+
+            return result;
+        }
+    }
+
+    /**
+     * Creates a cursor wrapper that discards elements if they don't match a given predicate. As soon as any element does not match the
+     * predicate, all following elements will be discarded.
+     *
+     * @param cursor Underlying cursor with data.
+     * @param predicate Predicate for elements to be kept.
+     * @param <T> Cursor element type.
+     * @return Cursor wrapper.
+     */
+    static <T> Cursor<T> takeWhile(Cursor<T> cursor, Predicate<T> predicate) {
+        return new TakeWhileCursor<>(cursor, predicate);
+    }
+
+    /**
+     * Cursor wrapper that transforms the underlying cursor's data using the provided mapping function.
+     *
+     * @param <T> Type of the original data.
+     * @param <U> Type of the transformed data.
+     */
+    private static class MapCursor<T, U> implements Cursor<U> {
+        private final Cursor<T> cursor;
+
+        private final Function<T, U> mapper;
+
+        MapCursor(Cursor<T> cursor, Function<T, U> mapper) {
+            this.cursor = cursor;
+            this.mapper = mapper;
+        }
+
+        @Override
+        public void close() throws Exception {
+            cursor.close();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return cursor.hasNext();
+        }
+
+        @Override
+        public U next() {
+            return mapper.apply(cursor.next());
+        }
+    }
+
+    /**
+     * Creates a cursor wrapper that transforms the underlying cursor's data using the provided mapping function.
+     *
+     * @param cursor Underlying cursor with data.
+     * @param mapper Function to transform the elements of the underlying cursor.
+     * @param <T> Type of the original data.
+     * @param <U> Type of the transformed data.
+     * @return Cursor wrapper.
+     */
+    static <T, U> Cursor<U> map(Cursor<T> cursor, Function<T, U> mapper) {
+        return new MapCursor<>(cursor, mapper);
+    }
+
+    /**
+     * Creates a cursor that iterates over both given cursors.
+     *
+     * @param a First cursor.
+     * @param b Second cursor.
+     * @param <T> Cursor element type.
+     * @return Cursor that iterates over both given cursors.
+     */
+    static <T> Cursor<T> concat(Cursor<T> a, Cursor<T> b) {
+        return new Cursor<>() {
+            private Cursor<T> currentCursor = a;
+
+            @Override
+            public void close() throws Exception {
+                IgniteUtils.closeAll(a, b);
+            }
+
+            @Override
+            public boolean hasNext() {
+                if (currentCursor.hasNext()) {
+                    return true;
+                } else if (currentCursor == b) {
+                    return false;
+                } else {
+                    currentCursor = b;
+
+                    return currentCursor.hasNext();
+                }
+            }
+
+            @Override
+            public T next() {
+                return currentCursor.next();
+            }
+        };
+    }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java
new file mode 100644
index 0000000000..36a65b6f84
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import static org.apache.ignite.internal.binarytuple.BinaryTupleCommon.isPrefix;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+
+/**
+ * {@link AbstractComparator} implementation that compares Binary Tuples.
+ */
+public class RocksDbBinaryTupleComparator extends AbstractComparator {
+    private final BinaryTupleComparator comparator;
+
+    /** Options needed for resource management. */
+    private final ComparatorOptions options;
+
+    /**
+     * Constructor.
+     *
+     * @param descriptor Sorted Index descriptor.
+     */
+    public RocksDbBinaryTupleComparator(SortedIndexDescriptor descriptor) {
+        this(descriptor, new ComparatorOptions());
+    }
+
+    private RocksDbBinaryTupleComparator(SortedIndexDescriptor descriptor, ComparatorOptions options) {
+        super(options);
+
+        this.options = options;
+        this.comparator = new BinaryTupleComparator(descriptor);
+    }
+
+    @Override
+    public String name() {
+        return getClass().getCanonicalName();
+    }
+
+    @Override
+    public int compare(ByteBuffer a, ByteBuffer b) {
+        int comparePartitionId = Short.compare(a.getShort(), b.getShort());
+
+        if (comparePartitionId != 0) {
+            return comparePartitionId;
+        }
+
+        ByteBuffer firstBinaryTupleBuffer = a.slice().order(ByteOrder.LITTLE_ENDIAN);
+        ByteBuffer secondBinaryTupleBuffer = b.slice().order(ByteOrder.LITTLE_ENDIAN);
+
+        int compareTuples = comparator.compare(firstBinaryTupleBuffer, secondBinaryTupleBuffer);
+
+        // Binary Tuple Prefixes don't have row IDs, so they can't be compared.
+        if (compareTuples != 0 || isPrefix(firstBinaryTupleBuffer) || isPrefix(secondBinaryTupleBuffer)) {
+            return compareTuples;
+        }
+
+        return compareRowIds(a, b);
+    }
+
+    private static int compareRowIds(ByteBuffer a, ByteBuffer b) {
+        long firstMostSignBits = a.getLong(a.limit() - Long.BYTES * 2);
+        long secondMostSignBits = b.getLong(b.limit() - Long.BYTES * 2);
+
+        int compare = Long.compare(firstMostSignBits, secondMostSignBits);
+
+        if (compare != 0) {
+            return compare;
+        }
+
+        long firstLeastSignBits = a.getLong(a.limit() - Long.BYTES);
+        long secondLeastSignBits = b.getLong(b.limit() - Long.BYTES);
+
+        return Long.compare(firstLeastSignBits, secondLeastSignBits);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+
+        options.close();
+    }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
index ee204fc9cf..bdc06a8a66 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.storage.rocksdb.index;
 
-import static org.apache.ignite.internal.rocksdb.RocksUtils.rangeEnd;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementArray;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
 
@@ -109,7 +109,7 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
     public Cursor<RowId> get(BinaryTuple key) {
         byte[] rangeStart = rocksPrefix(key);
 
-        byte[] rangeEnd = rangeEnd(rangeStart);
+        byte[] rangeEnd = incrementArray(rangeStart);
 
         Slice upperBound = rangeEnd == null ? null : new Slice(rangeEnd);
 
@@ -162,7 +162,7 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
 
     @Override
     public void destroy() {
-        byte[] rangeEnd = rangeEnd(constantPrefix);
+        byte[] rangeEnd = incrementArray(constantPrefix);
 
         assert rangeEnd != null;
 
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
new file mode 100644
index 0000000000..36321b8b6c
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.concat;
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.dropWhile;
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.map;
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.takeWhile;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
+import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * {@link SortedIndexStorage} implementation based on RocksDB.
+ *
+ * <p>This storage uses the following format for keys:
+ * <pre>
+ * Partition ID - 2 bytes
+ * Tuple value - variable length
+ * Row ID (UUID) - 16 bytes
+ * </pre>
+ *
+ * <p>We use an empty array as values, because all required information can be extracted from the key.
+ */
+public class RocksDbSortedIndexStorage implements SortedIndexStorage {
+    private final SortedIndexDescriptor descriptor;
+
+    private final ColumnFamily indexCf;
+
+    private final RocksDbMvPartitionStorage partitionStorage;
+
+    /**
+     * Creates a storage.
+     *
+     * @param descriptor Sorted Index descriptor.
+     * @param indexCf Column family that stores the index data.
+     * @param partitionStorage Partition storage of the corresponding index.
+     */
+    public RocksDbSortedIndexStorage(
+            SortedIndexDescriptor descriptor,
+            ColumnFamily indexCf,
+            RocksDbMvPartitionStorage partitionStorage
+    ) {
+        this.descriptor = descriptor;
+        this.indexCf = indexCf;
+        this.partitionStorage = partitionStorage;
+    }
+
+    @Override
+    public SortedIndexDescriptor indexDescriptor() {
+        return descriptor;
+    }
+
+    @Override
+    public void put(IndexRow row) {
+        WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+
+        try {
+            writeBatch.put(indexCf.handle(), rocksKey(row), BYTE_EMPTY_ARRAY);
+        } catch (RocksDBException e) {
+            throw new StorageException("Unable to insert data into sorted index. Index ID: " + descriptor.id(), e);
+        }
+    }
+
+    @Override
+    public void remove(IndexRow row) {
+        WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+
+        try {
+            writeBatch.delete(indexCf.handle(), rocksKey(row));
+        } catch (RocksDBException e) {
+            throw new StorageException("Unable to remove data from sorted index. Index ID: " + descriptor.id(), e);
+        }
+    }
+
+    @Override
+    public Cursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
+        boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
+        boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
+
+        return scan(lowerBound, upperBound, includeLower, includeUpper);
+    }
+
+    private Cursor<IndexRow> scan(
+            @Nullable BinaryTuplePrefix lowerBound,
+            @Nullable BinaryTuplePrefix upperBound,
+            boolean includeLower,
+            boolean includeUpper
+    ) {
+        byte[] lowerBoundBytes = lowerBound == null ? null : rocksPrefix(lowerBound);
+        byte[] upperBoundBytes = upperBound == null ? null : rocksPrefix(upperBound);
+
+        Cursor<ByteBuffer> cursor = createScanCursor(lowerBoundBytes, upperBoundBytes);
+
+        // Skip the lower bound, if needed (RocksDB includes the lower bound by default).
+        if (!includeLower && lowerBound != null) {
+            cursor = dropWhile(cursor, startsWith(lowerBound));
+        }
+
+        // Include the upper bound, if needed (RocksDB excludes the upper bound by default).
+        if (includeUpper && upperBound != null) {
+            Cursor<ByteBuffer> upperBoundCursor = takeWhile(createScanCursor(upperBoundBytes, null), startsWith(upperBound));
+
+            cursor = concat(cursor, upperBoundCursor);
+        }
+
+        return map(cursor, this::decodeRow);
+    }
+
+    private Cursor<ByteBuffer> createScanCursor(byte @Nullable [] lowerBound, byte @Nullable [] upperBound) {
+        Slice upperBoundSlice = upperBound == null ? null : new Slice(upperBound);
+
+        ReadOptions options = new ReadOptions().setIterateUpperBound(upperBoundSlice);
+
+        RocksIterator it = indexCf.newIterator(options);
+
+        if (lowerBound == null) {
+            it.seekToFirst();
+        } else {
+            it.seek(lowerBound);
+        }
+
+        return new RocksIteratorAdapter<>(it) {
+            @Override
+            protected ByteBuffer decodeEntry(byte[] key, byte[] value) {
+                return ByteBuffer.wrap(key).order(ByteOrder.BIG_ENDIAN);
+            }
+
+            @Override
+            public void close() throws Exception {
+                super.close();
+
+                IgniteUtils.closeAll(options, upperBoundSlice);
+            }
+        };
+    }
+
+    private IndexRow decodeRow(ByteBuffer bytes) {
+        assert bytes.getShort(0) == partitionStorage.partitionId();
+
+        var tuple = new BinaryTuple(descriptor.binaryTupleSchema(), binaryTupleSlice(bytes));
+
+        // RowId UUID is located at the last 16 bytes of the key
+        long mostSignificantBits = bytes.getLong(bytes.limit() - Long.BYTES * 2);
+        long leastSignificantBits = bytes.getLong(bytes.limit() - Long.BYTES);
+
+        var rowId = new RowId(partitionStorage.partitionId(), mostSignificantBits, leastSignificantBits);
+
+        return new IndexRowImpl(tuple, rowId);
+    }
+
+    private byte[] rocksPrefix(BinaryTuplePrefix prefix) {
+        return rocksPrefix(prefix, 0).array();
+    }
+
+    private ByteBuffer rocksPrefix(InternalTuple prefix, int extraLength) {
+        ByteBuffer keyBytes = prefix.byteBuffer();
+
+        return ByteBuffer.allocate(Short.BYTES + keyBytes.remaining() + extraLength)
+                .order(ByteOrder.BIG_ENDIAN)
+                .putShort((short) partitionStorage.partitionId())
+                .put(keyBytes);
+    }
+
+    private byte[] rocksKey(IndexRow row) {
+        RowId rowId = row.rowId();
+
+        // We don't store the Partition ID as it is already a part of the key.
+        return rocksPrefix(row.indexColumns(), 2 * Long.BYTES)
+                .putLong(rowId.mostSignificantBits())
+                .putLong(rowId.leastSignificantBits())
+                .array();
+    }
+
+    private Predicate<ByteBuffer> startsWith(BinaryTuplePrefix prefix) {
+        var comparator = new BinaryTupleComparator(descriptor);
+
+        return key -> {
+            // First, compare the partitionIDs.
+            boolean partitionIdCompare = key.getShort(0) == partitionStorage.partitionId();
+
+            if (!partitionIdCompare) {
+                return false;
+            }
+
+            // Finally, compare the remaining parts of the tuples.
+            // TODO: This part may be optimized by comparing binary tuple representations directly. However, currently BinaryTuple prefixes
+            //  are not binary compatible with regular tuples. See https://issues.apache.org/jira/browse/IGNITE-17711.
+            return comparator.compare(prefix.byteBuffer(), binaryTupleSlice(key)) == 0;
+        };
+    }
+
+    private static ByteBuffer binaryTupleSlice(ByteBuffer key) {
+        return key.duplicate()
+                // Discard partition ID.
+                .position(Short.BYTES)
+                // Discard row ID.
+                .limit(key.limit() - Long.BYTES * 2)
+                .slice()
+                .order(ByteOrder.LITTLE_ENDIAN);
+    }
+}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 467d59a703..2d247e6afe 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -48,7 +48,6 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -163,22 +162,4 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
     void storageAdvertisesItIsPersistent() {
         assertThat(tableStorage.isVolatile(), is(false));
     }
-
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17318")
-    @Override
-    public void testCreateSortedIndex() {
-        super.testCreateSortedIndex();
-    }
-
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17318")
-    @Override
-    public void testDestroyIndex() {
-        super.testDestroyIndex();
-    }
-
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17318")
-    @Override
-    public void testMisconfiguredIndices() {
-        super.testMisconfiguredIndices();
-    }
 }
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/CursorUtilsTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/CursorUtilsTest.java
new file mode 100644
index 0000000000..9e5c6be715
--- /dev/null
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/CursorUtilsTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.concat;
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.dropWhile;
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.map;
+import static org.apache.ignite.internal.storage.rocksdb.index.CursorUtils.takeWhile;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Arrays;
+import org.apache.ignite.internal.util.Cursor;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Class for testing {@link CursorUtils}.
+ */
+public class CursorUtilsTest {
+    @Test
+    public void testDropWhile() {
+        Cursor<Integer> actual = dropWhile(cursor(1, 2, 5, 14, 20, 1), i -> i < 10);
+
+        assertThat(actual, contains(14, 20, 1));
+
+        assertThat(dropWhile(cursor(), i -> i.hashCode() < 10), is(emptyIterable()));
+    }
+
+    @Test
+    public void testTakeWhile() {
+        Cursor<Integer> actual = takeWhile(cursor(1, 2, 5, 14, 20, 1), i -> i < 10);
+
+        assertThat(actual, contains(1, 2, 5));
+
+        assertThat(takeWhile(cursor(), i -> i.hashCode() < 10), is(emptyIterable()));
+    }
+
+    @Test
+    public void testMap() {
+        Cursor<String> actual = map(cursor(1, 2, 5, 14, 20), i -> "foo" + i);
+
+        assertThat(actual, contains("foo1", "foo2", "foo5", "foo14", "foo20"));
+
+        assertThat(map(cursor(), Object::toString), is(emptyIterable()));
+    }
+
+    @Test
+    public void testConcat() {
+        Cursor<Integer> actual = concat(cursor(1, 2, 5), cursor(5, 2, 1));
+
+        assertThat(actual, contains(1, 2, 5, 5, 2, 1));
+
+        assertThat(concat(cursor(), cursor()), is(emptyIterable()));
+    }
+
+    @Test
+    public void testCombination() {
+        Cursor<Integer> dropWhile = dropWhile(cursor(1, 5, 8, 10, 42), i -> i <= 8);
+
+        Cursor<Integer> takeWhile = takeWhile(dropWhile, i -> i >= 10);
+
+        Cursor<Integer> concat = concat(takeWhile, cursor(1, 2, 3));
+
+        Cursor<String> map = map(concat, String::valueOf);
+
+        assertThat(map, contains("10", "42", "1", "2", "3"));
+    }
+
+    @SafeVarargs
+    private static <T> Cursor<T> cursor(T... elements) {
+        return Cursor.fromIterator(Arrays.asList(elements).iterator());
+    }
+}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
index 2839e05c05..0db33a89b6 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
@@ -19,9 +19,6 @@ package org.apache.ignite.internal.storage.rocksdb.index;
 
 import java.nio.file.Path;
 import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.EntryCountBudgetConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
@@ -64,17 +61,17 @@ public class RocksDbHashIndexStorageTest extends AbstractHashIndexStorageTest {
                             UnlimitedBudgetConfigurationSchema.class
                     },
                     value = "mock.dataStorage.name = " + RocksDbStorageEngine.ENGINE_NAME
-            ) TableConfiguration tableCfg,
-
-            @InjectConfiguration(polymorphicExtensions = {
-                    HashIndexConfigurationSchema.class,
-                    UnknownDataStorageConfigurationSchema.class,
-                    ConstantValueDefaultConfigurationSchema.class,
-                    FunctionCallDefaultConfigurationSchema.class,
-                    NullValueDefaultConfigurationSchema.class,
-                    UnlimitedBudgetConfigurationSchema.class,
-                    EntryCountBudgetConfigurationSchema.class
-            }) TablesConfiguration tablesConfig
+            )
+            TableConfiguration tableCfg,
+            @InjectConfiguration(
+                    polymorphicExtensions = {
+                            HashIndexConfigurationSchema.class,
+                            UnknownDataStorageConfigurationSchema.class,
+                            NullValueDefaultConfigurationSchema.class,
+                            UnlimitedBudgetConfigurationSchema.class
+                    }
+            )
+            TablesConfiguration tablesConfig
     ) {
         engine = new RocksDbStorageEngine(rocksDbEngineConfig, workDir);
 
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
similarity index 73%
copy from modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
copy to modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
index 2839e05c05..f46c29e5e4 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
@@ -19,17 +19,14 @@ package org.apache.ignite.internal.storage.rocksdb.index;
 
 import java.nio.file.Path;
 import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.EntryCountBudgetConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.storage.index.AbstractHashIndexStorageTest;
+import org.apache.ignite.internal.storage.index.AbstractSortedIndexStorageTest;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbTableStorage;
 import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageConfigurationSchema;
@@ -42,11 +39,11 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
- * Tests for the {@link RocksDbHashIndexStorage} class.
+ * Tests for the {@link RocksDbSortedIndexStorage} class.
  */
 @ExtendWith(WorkDirectoryExtension.class)
 @ExtendWith(ConfigurationExtension.class)
-public class RocksDbHashIndexStorageTest extends AbstractHashIndexStorageTest {
+public class RocksDbSortedIndexStorageTest extends AbstractSortedIndexStorageTest {
     private RocksDbStorageEngine engine;
 
     private RocksDbTableStorage tableStorage;
@@ -59,22 +56,22 @@ public class RocksDbHashIndexStorageTest extends AbstractHashIndexStorageTest {
             @InjectConfiguration(
                     polymorphicExtensions = {
                             RocksDbDataStorageConfigurationSchema.class,
-                            HashIndexConfigurationSchema.class,
+                            SortedIndexConfigurationSchema.class,
                             NullValueDefaultConfigurationSchema.class,
                             UnlimitedBudgetConfigurationSchema.class
                     },
                     value = "mock.dataStorage.name = " + RocksDbStorageEngine.ENGINE_NAME
-            ) TableConfiguration tableCfg,
-
-            @InjectConfiguration(polymorphicExtensions = {
-                    HashIndexConfigurationSchema.class,
-                    UnknownDataStorageConfigurationSchema.class,
-                    ConstantValueDefaultConfigurationSchema.class,
-                    FunctionCallDefaultConfigurationSchema.class,
-                    NullValueDefaultConfigurationSchema.class,
-                    UnlimitedBudgetConfigurationSchema.class,
-                    EntryCountBudgetConfigurationSchema.class
-            }) TablesConfiguration tablesConfig
+            )
+            TableConfiguration tableCfg,
+            @InjectConfiguration(
+                    polymorphicExtensions = {
+                            SortedIndexConfigurationSchema.class,
+                            UnknownDataStorageConfigurationSchema.class,
+                            NullValueDefaultConfigurationSchema.class,
+                            UnlimitedBudgetConfigurationSchema.class
+                    }
+            )
+            TablesConfiguration tablesConfig
     ) {
         engine = new RocksDbStorageEngine(rocksDbEngineConfig, workDir);