You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2022/02/07 16:45:48 UTC

[ignite-3] branch ignite-14925-sorted-indexes updated: IGNITE-14830 Introduce sorted index prototype (#518)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch ignite-14925-sorted-indexes
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-14925-sorted-indexes by this push:
     new 99fe791  IGNITE-14830 Introduce sorted index prototype (#518)
99fe791 is described below

commit 99fe79148b62652cd9d19999be55a13ef491b90f
Author: Taras Ledkov <tl...@gridgain.com>
AuthorDate: Mon Feb 7 19:45:22 2022 +0300

    IGNITE-14830 Introduce sorted index prototype (#518)
---
 .../{Producer.java => AbstractProducer.java}       |  28 +-
 .../org/apache/ignite/internal/manager/Event.java  |   2 +-
 .../ignite/internal/manager/EventParameters.java   |   2 +-
 .../apache/ignite/internal/manager/Producer.java   |  49 +-
 modules/index-api/pom.xml                          |  91 ++++
 .../apache/ignite/internal/idx/IndexManager.java   | 105 +++++
 .../ignite/internal/idx/InternalSortedIndex.java   |  73 +++
 .../internal/idx/SortedIndexColumnCollation.java   |  88 ++++
 .../internal/idx/SortedIndexColumnDescriptor.java  |  72 +++
 .../ignite/internal/idx/SortedIndexDescriptor.java |  70 +++
 .../ignite/internal/idx/event/IndexEvent.java}     |  15 +-
 .../internal/idx/event/IndexEventParameters.java   |  81 ++++
 modules/index/pom.xml                              | 135 ++++++
 .../ignite/internal/idx/IndexManagerImpl.java      | 513 +++++++++++++++++++++
 .../internal/idx/InternalSortedIndexImpl.java      | 313 +++++++++++++
 modules/runner/pom.xml                             |  10 +
 .../internal/runner/app/ItTablesApiTest.java       |   7 +-
 .../ignite/internal/sql/engine/ItIndexDdlTest.java | 112 +++++
 .../org/apache/ignite/internal/app/IgniteImpl.java |  20 +-
 .../apache/ignite/internal/schema/BinaryRow.java   |   6 +
 .../ignite/internal/schema/ByteBufferRow.java      |  20 +
 .../org/apache/ignite/internal/schema/Column.java  |  10 +
 .../builder/SortedIndexDefinitionBuilderImpl.java  |   4 +-
 .../org/apache/ignite/internal/schema/row/Row.java |   6 +
 modules/sql-engine/pom.xml                         |  11 +
 .../internal/sql/engine/SqlQueryProcessor.java     | 139 +++++-
 .../sql/engine/exec/AbstractIndexScan.java         |   4 +-
 .../sql/engine/exec/ExecutionServiceImpl.java      |   7 +-
 .../ignite/internal/sql/engine/exec/IndexScan.java | 145 ++++++
 .../sql/engine/exec/LogicalRelImplementor.java     |  39 +-
 .../sql/engine/exec/RuntimeSortedIndex.java        |  16 +-
 .../sql/engine/exec/ddl/DdlCommandHandler.java     |  45 +-
 .../sql/engine/exec/rel/IndexSpoolNode.java        |   2 +-
 .../FilterSpoolMergeToSortedIndexSpoolRule.java    |   3 +-
 .../internal/sql/engine/schema/IgniteIndex.java    |  56 ++-
 .../internal/sql/engine/schema/IgniteSchema.java   |  10 +
 .../sql/engine/schema/IgniteTableImpl.java         |  10 +
 .../sql/engine/schema/SqlSchemaManagerImpl.java    |  81 +++-
 .../internal/sql/engine/trait/TraitUtils.java      |   4 +-
 .../internal/sql/engine/util/IndexConditions.java  |  27 +-
 .../ignite/internal/sql/engine/util/RexUtils.java  |  40 +-
 .../internal/sql/engine/StopCalciteModuleTest.java |   6 +-
 .../sql/engine/exec/MockedStructuresTest.java      |  52 +--
 .../sql/engine/exec/RuntimeSortedIndexTest.java    |   1 +
 .../CorrelatedNestedLoopJoinPlannerTest.java       |  15 +-
 .../engine/planner/HashIndexSpoolPlannerTest.java  |  19 +-
 .../internal/sql/engine/planner/PlannerTest.java   |   2 +-
 .../planner/SortedIndexSpoolPlannerTest.java       |  33 +-
 modules/storage-api/pom.xml                        |   5 +
 .../storage/engine/StorageRowListener.java}        |  55 ++-
 .../internal/storage/engine/TableStorage.java      |   5 +-
 .../ignite/internal/storage/index/IndexRow.java    |  24 +-
 .../internal/storage/index/IndexRowPrefix.java     |  17 +-
 .../storage/index/IndexSchemaDescriptor.java}      |  23 +-
 .../storage/index/SortedIndexDescriptor.java       | 206 ---------
 .../internal/storage/index/SortedIndexStorage.java |  19 +-
 .../storage/rocksdb/RocksDbTableStorage.java       |  34 +-
 .../rocksdb/index/BinaryIndexRowDeserializer.java  |  48 --
 .../rocksdb/index/BinaryIndexRowFactory.java       |  92 ----
 .../rocksdb/index/BinaryIndexRowSerializer.java    |  72 +++
 .../storage/rocksdb/index/BinaryRowComparator.java |  13 +-
 .../storage/rocksdb/index/IndexBinaryRow.java}     |  18 +-
 ...BinaryIndexRow.java => IndexBinaryRowImpl.java} |  49 +-
 .../rocksdb}/index/IndexRowDeserializer.java       |   8 +-
 .../storage/rocksdb/index/IndexRowImpl.java        | 117 +++++
 .../storage/rocksdb/index/IndexRowSerializer.java} |  12 +-
 .../storage/rocksdb/index/PrefixComparator.java    |  21 +-
 .../rocksdb/index/RocksDbSortedIndexStorage.java   |  53 +--
 .../index/SortedIndexStorageDescriptor.java        |  83 ++++
 .../storage/rocksdb/index/IndexRowWrapper.java     |  69 +--
 .../index/RocksDbSortedIndexStorageTest.java       | 290 ++++--------
 .../storage/rocksdb/index/TestIndexRow.java}       |  45 +-
 .../ignite/internal/table/StorageRowListener.java} |  55 ++-
 .../apache/ignite/internal/table/TableImpl.java    |  27 +-
 .../internal/table/TableStorageRowListener.java    |  51 ++
 .../internal/table/distributed/TableManager.java   |  16 +-
 .../distributed/storage/VersionedRowStore.java     |  21 +
 parent/pom.xml                                     |  12 +
 pom.xml                                            |   2 +
 79 files changed, 3121 insertions(+), 1040 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/AbstractProducer.java
similarity index 79%
copy from modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
copy to modules/core/src/main/java/org/apache/ignite/internal/manager/AbstractProducer.java
index 665b9d9..8a86a30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/AbstractProducer.java
@@ -26,38 +26,24 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Interface which can produce its events.
  */
-public abstract class Producer<T extends Event, P extends EventParameters> {
+public abstract class AbstractProducer<T extends Event, P extends EventParameters> implements Producer<T, P> {
     /** All listeners. */
     private ConcurrentHashMap<T, ConcurrentLinkedQueue<EventListener<P>>> listeners = new ConcurrentHashMap<>();
 
-    /**
-     * Registers an event listener. When the event predicate returns true it would never invoke after, otherwise this predicate would
-     * receive an event again.
-     *
-     * @param evt     Event.
-     * @param closure Closure.
-     */
+    /** {@inheritDoc} */
+    @Override
     public void listen(T evt, EventListener<P> closure) {
         listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()).offer(closure);
     }
 
-    /**
-     * Removes a listener associated with the event.
-     *
-     * @param evt     Event.
-     * @param closure Closure.
-     */
+    /** {@inheritDoc} */
+    @Override
     public void removeListener(T evt, EventListener<P> closure) {
         removeListener(evt, closure, null);
     }
 
-    /**
-     * Removes a listener associated with the event.
-     *
-     * @param evt     Event.
-     * @param closure Closure.
-     * @param cause   The exception that was a cause which a listener is removed.
-     */
+    /** {@inheritDoc} */
+    @Override
     public void removeListener(T evt, EventListener<P> closure, @Nullable IgniteInternalCheckedException cause) {
         if (listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()).remove(closure)) {
             closure.remove(cause == null ? new ListenerRemovedException() : cause.getCause() == null ? cause : cause.getCause());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
index 9d7ca85..e335a59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.manager;
 /**
  * The event cas which is produced by event producer component.
  *
- * @see Producer#fireEvent(Event, EventParameters, Throwable)
+ * @see Producer#listen(Event, EventListener)
  */
 public interface Event {
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
index 0d230df..f06669a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.manager;
 /**
  * Event parameters. This type passed to the event listener.
  *
- * @see Producer#fireEvent(Event, EventParameters, Throwable)
+ * @see AbstractProducer#fireEvent(Event, EventParameters, Throwable)
  */
 public interface EventParameters {
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
index 665b9d9..04b9e43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
@@ -17,19 +17,13 @@
 
 package org.apache.ignite.internal.manager;
 
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Interface which can produce its events.
  */
-public abstract class Producer<T extends Event, P extends EventParameters> {
-    /** All listeners. */
-    private ConcurrentHashMap<T, ConcurrentLinkedQueue<EventListener<P>>> listeners = new ConcurrentHashMap<>();
-
+public interface Producer<T extends Event, P extends EventParameters> {
     /**
      * Registers an event listener. When the event predicate returns true it would never invoke after, otherwise this predicate would
      * receive an event again.
@@ -37,9 +31,7 @@ public abstract class Producer<T extends Event, P extends EventParameters> {
      * @param evt     Event.
      * @param closure Closure.
      */
-    public void listen(T evt, EventListener<P> closure) {
-        listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()).offer(closure);
-    }
+    void listen(T evt, EventListener<P> closure);
 
     /**
      * Removes a listener associated with the event.
@@ -47,9 +39,7 @@ public abstract class Producer<T extends Event, P extends EventParameters> {
      * @param evt     Event.
      * @param closure Closure.
      */
-    public void removeListener(T evt, EventListener<P> closure) {
-        removeListener(evt, closure, null);
-    }
+    void removeListener(T evt, EventListener<P> closure);
 
     /**
      * Removes a listener associated with the event.
@@ -58,36 +48,5 @@ public abstract class Producer<T extends Event, P extends EventParameters> {
      * @param closure Closure.
      * @param cause   The exception that was a cause which a listener is removed.
      */
-    public void removeListener(T evt, EventListener<P> closure, @Nullable IgniteInternalCheckedException cause) {
-        if (listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()).remove(closure)) {
-            closure.remove(cause == null ? new ListenerRemovedException() : cause.getCause() == null ? cause : cause.getCause());
-        }
-    }
-
-    /**
-     * Notifies every listener that subscribed before.
-     *
-     * @param evt    Event type.
-     * @param params Event parameters.
-     * @param err    Exception when it was happened, or {@code null} otherwise.
-     */
-    protected void fireEvent(T evt, P params, Throwable err) {
-        ConcurrentLinkedQueue<EventListener<P>> queue = listeners.get(evt);
-
-        if (queue == null) {
-            return;
-        }
-
-        EventListener<P> closure;
-
-        Iterator<EventListener<P>> iter = queue.iterator();
-
-        while (iter.hasNext()) {
-            closure = iter.next();
-
-            if (closure.notify(params, err)) {
-                iter.remove();
-            }
-        }
-    }
+    void removeListener(T evt, EventListener<P> closure, @Nullable IgniteInternalCheckedException cause);
 }
diff --git a/modules/index-api/pom.xml b/modules/index-api/pom.xml
new file mode 100644
index 0000000..cab2544
--- /dev/null
+++ b/modules/index-api/pom.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.ignite</groupId>
+    <artifactId>ignite-parent</artifactId>
+    <version>1</version>
+    <relativePath>../../parent/pom.xml</relativePath>
+  </parent>
+
+  <artifactId>ignite-index-api</artifactId>
+  <version>3.0.0-SNAPSHOT</version>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-schema</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-api</artifactId>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-core</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-engine</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-configuration-annotation-processor</artifactId>
+            <version>${project.version}</version>
+          </dependency>
+        </dependencies>
+        <configuration>
+          <annotationProcessorPaths>
+            <path>
+              <groupId>org.apache.ignite</groupId>
+              <artifactId>ignite-configuration-annotation-processor</artifactId>
+              <version>${project.version}</version>
+            </path>
+          </annotationProcessorPaths>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/modules/index-api/src/main/java/org/apache/ignite/internal/idx/IndexManager.java b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/IndexManager.java
new file mode 100644
index 0000000..f1a1950
--- /dev/null
+++ b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/IndexManager.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import org.apache.ignite.configuration.schemas.table.TableIndexChange;
+import org.apache.ignite.internal.idx.event.IndexEvent;
+import org.apache.ignite.internal.idx.event.IndexEventParameters;
+import org.apache.ignite.internal.manager.Producer;
+import org.apache.ignite.lang.IndexAlreadyExistsException;
+import org.apache.ignite.lang.IndexNotFoundException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Internal index manager facade provides low-level methods for indexes operations.
+ */
+public interface IndexManager extends Producer<IndexEvent, IndexEventParameters> {
+    /**
+     * Creates a new index with the specified name.
+     *
+     * @param idxCanonicalName Index canonical name.
+     * @param tblCanonicalName Table canonical name.
+     * @param idxChange        Index configuration.
+     * @return Index.
+     * @throws IndexAlreadyExistsException if the index exists.
+     */
+    InternalSortedIndex createIndex(
+            String idxCanonicalName,
+            String tblCanonicalName,
+            Consumer<TableIndexChange> idxChange
+    );
+
+    /**
+     * Create index asynchronously.
+     *
+     * @param idxCanonicalName Index canonical name.
+     * @param tblCanonicalName Table canonical name.
+     * @param idxChange        Index configuration.
+     * @return Index future, that may be completed exceptionally with {@link IndexAlreadyExistsException} if the index exists.
+     */
+    CompletableFuture<InternalSortedIndex> createIndexAsync(
+            String idxCanonicalName,
+            String tblCanonicalName,
+            Consumer<TableIndexChange> idxChange
+    );
+
+    /**
+     * Drop index.
+     *
+     * @param idxCanonicalName Index canonical name.
+     * @throws IndexAlreadyExistsException if the index doesn't exist.
+     */
+    void dropIndex(String idxCanonicalName);
+
+    /**
+     * Drop index asynchronously.
+     *
+     * @param idxCanonicalName Index canonical name.
+     * @return Index future, that may be completed exceptionally with {@link IndexNotFoundException} if the index doesn't exist.
+     */
+    CompletableFuture<Void> dropIndexAsync(String idxCanonicalName);
+
+    /**
+     * Gets indexes of the table.
+     *
+     * @param tblId Table identifier to lookup indexes.
+     * @return Indexes of the table.
+     * @throws NodeStoppingException If an implementation stopped before the method was invoked.
+     */
+    List<InternalSortedIndex> indexes(UUID tblId);
+
+    /**
+     * Gets index of the table.
+     *
+     * @return Index of the table.
+     * @throws NodeStoppingException If an implementation stopped before the method was invoked.
+     */
+    InternalSortedIndex index(String idxCanonicalName);
+
+    /**
+     * Gets index of the table asynchronously.
+     *
+     * @return Index future.
+     * @throws NodeStoppingException If an implementation stopped before the method was invoked.
+     */
+    CompletableFuture<InternalSortedIndex> indexAsync(String idxCanonicalName);
+}
diff --git a/modules/index-api/src/main/java/org/apache/ignite/internal/idx/InternalSortedIndex.java b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/InternalSortedIndex.java
new file mode 100644
index 0000000..ba9b54c
--- /dev/null
+++ b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/InternalSortedIndex.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx;
+
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.table.Tuple;
+
+/**
+ * Sorted index facade.
+ */
+public interface InternalSortedIndex {
+    /** Exclude lower bound. */
+    byte GREATER = 0;
+
+    /** Include lower bound. */
+    byte GREATER_OR_EQUAL = 1;
+
+    /** Exclude upper bound. */
+    byte LESS = 0;
+
+    /** Include upper bound. */
+    byte LESS_OR_EQUAL = 1 << 1;
+
+    /**
+     * Return index name.
+     *
+     * @return Index name.
+     */
+    String name();
+
+    /**
+     * Return indexed table.
+     *
+     * @return Indexed table identifier.
+     */
+    UUID tableId();
+
+    /**
+     * Return index descriptor.
+     */
+    SortedIndexDescriptor descriptor();
+
+    /**
+     * Return rows between lower and upper bounds. Fill results rows by fields specified at the projection set.
+     *
+     * @param low           Lower bound of the scan.
+     * @param up            Lower bound of the scan.
+     * @param scanBoundMask Scan bound mask (specify how to work with rows equals to the bounds: include or exclude).
+     */
+    Cursor<Tuple> scan(Tuple low, Tuple up, byte scanBoundMask, BitSet proj);
+
+    /**
+     * Drop index.
+     */
+    void drop();
+}
diff --git a/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexColumnCollation.java b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexColumnCollation.java
new file mode 100644
index 0000000..59644b7
--- /dev/null
+++ b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexColumnCollation.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx;
+
+import java.util.Objects;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Index column collation: direction and NULLs order that a column's value is ordered in.
+ */
+public class SortedIndexColumnCollation {
+    private final boolean asc;
+
+    private final boolean nullsFirst;
+
+    /**
+     * Constructor.
+     *
+     * @param asc {@code true} if this column is sorted in ascending order or {@code false} otherwise.
+     */
+    public SortedIndexColumnCollation(
+            boolean asc
+    ) {
+        this(asc, false);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param asc {@code true} if this column is sorted in ascending order or {@code false} otherwise.
+     * @param nullsFirst NULL direction. If the flag is {@code true} NULLs are placed before all values {@code false} otherwise.
+     */
+    public SortedIndexColumnCollation(
+            boolean asc,
+            boolean nullsFirst
+    ) {
+        this.asc = asc;
+        this.nullsFirst = nullsFirst;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        SortedIndexColumnCollation that = (SortedIndexColumnCollation) o;
+        return asc == that.asc && nullsFirst == that.nullsFirst;
+    }
+
+    public boolean asc() {
+        return asc;
+    }
+
+    public boolean isNullsFirst() {
+        return nullsFirst;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(asc, nullsFirst);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return S.toString(SortedIndexColumnCollation.class, this);
+    }
+}
diff --git a/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexColumnDescriptor.java b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexColumnDescriptor.java
new file mode 100644
index 0000000..382aa1a
--- /dev/null
+++ b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexColumnDescriptor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx;
+
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Descriptor of a Sorted Index column (column name and column sort order).
+ */
+public class SortedIndexColumnDescriptor {
+    private final Column column;
+
+    private final SortedIndexColumnCollation collation;
+
+    public SortedIndexColumnDescriptor(Column column, boolean asc) {
+        this(column, new SortedIndexColumnCollation(asc));
+    }
+
+    public SortedIndexColumnDescriptor(Column column, SortedIndexColumnCollation collation) {
+        this.column = column;
+        this.collation = collation;
+    }
+
+    /**
+     * Returns a column descriptor.
+     */
+    public Column column() {
+        return column;
+    }
+
+    /**
+     * Returns column collation.
+     */
+    public SortedIndexColumnCollation collation() {
+        return collation;
+    }
+
+    /**
+     * Returns {@code true} if this column is sorted in ascending order or {@code false} otherwise.
+     */
+    public boolean asc() {
+        return collation.asc();
+    }
+
+    /**
+     * Returns {@code true} if this column can contain null values or {@code false} otherwise.
+     */
+    public boolean nullable() {
+        return column.nullable();
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+}
diff --git a/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexDescriptor.java b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexDescriptor.java
new file mode 100644
index 0000000..b400e10
--- /dev/null
+++ b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexDescriptor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx;
+
+import java.util.List;
+import org.apache.ignite.internal.schema.Column;
+
+/**
+ * Descriptor for creating a Sorted Index Storage.
+ */
+public class SortedIndexDescriptor {
+    private final String name;
+
+    private final List<SortedIndexColumnDescriptor> columns;
+
+    private final Column[] pkColumns;
+
+    /**
+     * Creates an Index Descriptor from a given Table Configuration.
+     *
+     * @param name        Index name.
+     * @param columns     Indexed columns (configured columns + primary key columns).
+     * @param pkColumns   Pk columns.
+     */
+    public SortedIndexDescriptor(
+            String name,
+            final List<SortedIndexColumnDescriptor> columns,
+            Column[] pkColumns) {
+        this.name = name;
+        this.pkColumns = pkColumns;
+
+        this.columns = columns;
+    }
+
+    /**
+     * Returns this index' name.
+     */
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Returns the Column Descriptors that comprise a row of this index (indexed columns + primary key columns).
+     */
+    public List<SortedIndexColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns primary key columns.
+     */
+    public Column[] pkColumns() {
+        return pkColumns;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/event/IndexEvent.java
similarity index 72%
copy from modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
copy to modules/index-api/src/main/java/org/apache/ignite/internal/idx/event/IndexEvent.java
index 9d7ca85..11e998f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
+++ b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/event/IndexEvent.java
@@ -15,12 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.idx.event;
+
+import org.apache.ignite.internal.manager.Event;
 
 /**
- * The event cas which is produced by event producer component.
- *
- * @see Producer#fireEvent(Event, EventParameters, Throwable)
+ * Index management events.
  */
-public interface Event {
+public enum IndexEvent implements Event {
+    /** This event is fired when an index was created. */
+    CREATE,
+
+    /** This event is fired when an index was dropped. */
+    DROP
 }
diff --git a/modules/index-api/src/main/java/org/apache/ignite/internal/idx/event/IndexEventParameters.java b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/event/IndexEventParameters.java
new file mode 100644
index 0000000..3b286a5
--- /dev/null
+++ b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/event/IndexEventParameters.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx.event;
+
+import org.apache.ignite.internal.idx.InternalSortedIndex;
+import org.apache.ignite.internal.manager.EventParameters;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Table event parameters. There are properties which associate with a concrete table.
+ */
+public class IndexEventParameters implements EventParameters {
+    /** Index name. */
+    private final String idxName;
+
+    /** Indexed table name. */
+    private final String tblName;
+
+    /** Table instance. */
+    private final InternalSortedIndex idx;
+
+    /**
+     * Constructor.
+     *
+     * @param tblName Table name.
+     * @param idx Index instance.
+     */
+    public IndexEventParameters(String tblName, InternalSortedIndex idx) {
+        this(idx.name(), tblName, idx);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param idxName Index name.
+     * @param tblName Table name.
+     */
+    public IndexEventParameters(String idxName, String tblName) {
+        this(idxName, tblName, null);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param idxName Index name.
+     * @param tblName Table name.
+     * @param idx     Index.
+     */
+    private IndexEventParameters(String idxName, String tblName, @Nullable InternalSortedIndex idx) {
+        this.idxName = idxName;
+        this.tblName = tblName;
+        this.idx = idx;
+    }
+
+    public String indexName() {
+        return idxName;
+    }
+
+    public String tableName() {
+        return tblName;
+    }
+
+    public InternalSortedIndex index() {
+        return idx;
+    }
+}
diff --git a/modules/index/pom.xml b/modules/index/pom.xml
new file mode 100644
index 0000000..6314307
--- /dev/null
+++ b/modules/index/pom.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.ignite</groupId>
+    <artifactId>ignite-parent</artifactId>
+    <version>1</version>
+    <relativePath>../../parent/pom.xml</relativePath>
+  </parent>
+
+  <artifactId>ignite-index</artifactId>
+  <version>3.0.0-SNAPSHOT</version>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-transactions</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-index-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-schema</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-storage-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-table</artifactId>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-junit-jupiter</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-inline</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-engine</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-params</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-configuration</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- Logging in tests -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+    </plugins>
+  </build>
+</project>
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/idx/IndexManagerImpl.java b/modules/index/src/main/java/org/apache/ignite/internal/idx/IndexManagerImpl.java
new file mode 100644
index 0000000..ebada67
--- /dev/null
+++ b/modules/index/src/main/java/org/apache/ignite/internal/idx/IndexManagerImpl.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx;
+
+import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.directProxy;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.configuration.ConfigurationChangeException;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
+import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.configuration.schemas.table.IndexColumnView;
+import org.apache.ignite.configuration.schemas.table.SortedIndexView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexChange;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.configuration.schema.ExtendedTableConfiguration;
+import org.apache.ignite.internal.idx.event.IndexEvent;
+import org.apache.ignite.internal.idx.event.IndexEventParameters;
+import org.apache.ignite.internal.manager.AbstractProducer;
+import org.apache.ignite.internal.manager.EventListener;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.IndexAlreadyExistsException;
+import org.apache.ignite.lang.IndexNotFoundException;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.lang.TableNotFoundException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Internal index manager facade provides low-level methods for indexes operations.
+ */
+public class IndexManagerImpl extends AbstractProducer<IndexEvent, IndexEventParameters> implements IndexManager, IgniteComponent {
+    private static final IgniteLogger LOG = IgniteLogger.forClass(IndexManagerImpl.class);
+
+    private final TablesConfiguration tablesCfg;
+
+    private final TableManager tblMgr;
+
+    /** Indexes by canonical name. */
+    private final Map<String, InternalSortedIndex> idxsByName = new ConcurrentHashMap<>();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /**
+     * Constructor.
+     *
+     * @param tablesCfg      Tables configuration.
+     */
+    public IndexManagerImpl(
+            TableManager tblMgr,
+            TablesConfiguration tablesCfg
+    ) {
+        this.tblMgr = tblMgr;
+        this.tablesCfg = tablesCfg;
+    }
+
+    @Override
+    public void start() {
+        tablesCfg.tables().any().indices().listenElements(
+                new ConfigurationNamedListListener<>() {
+                    @Override
+                    public @NotNull CompletableFuture<?> onCreate(@NotNull ConfigurationNotificationEvent<TableIndexView> ctx) {
+                        TableConfiguration tbl = ctx.config(TableConfiguration.class);
+                        String tblName = tbl.name().value();
+                        String idxName = ctx.newValue().name();
+
+                        if (!busyLock.enterBusy()) {
+                            fireEvent(IndexEvent.CREATE,
+                                    new IndexEventParameters(
+                                            idxName,
+                                            tblName),
+                                    new NodeStoppingException());
+
+                            return CompletableFuture.completedFuture(new NodeStoppingException());
+                        }
+                        try {
+                            onIndexCreate(ctx);
+                        } catch (Exception e) {
+                            fireEvent(IndexEvent.CREATE, new IndexEventParameters(idxName, tblName), e);
+
+                            LOG.error("Internal error, index creation failed [name={}, table={}]", e, idxName, tblName);
+
+                            return CompletableFuture.completedFuture(e);
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+
+                        return CompletableFuture.completedFuture(null);
+                    }
+
+                    @Override
+                    public @NotNull CompletableFuture<?> onRename(@NotNull String oldName, @NotNull String newName,
+                            @NotNull ConfigurationNotificationEvent<TableIndexView> ctx) {
+                        // TODO: IGNITE-16196 Supports index rename
+                        return ConfigurationNamedListListener.super.onRename(oldName, newName, ctx);
+                    }
+
+                    @Override
+                    public @NotNull CompletableFuture<?> onDelete(@NotNull ConfigurationNotificationEvent<TableIndexView> ctx) {
+                        TableConfiguration tbl = ctx.config(TableConfiguration.class);
+                        String tblName = tbl.name().value();
+                        String idxName = ctx.oldValue().name();
+
+                        if (!busyLock.enterBusy()) {
+                            fireEvent(IndexEvent.DROP,
+                                    new IndexEventParameters(
+                                            idxName,
+                                            tblName
+                                    ),
+                                    new NodeStoppingException());
+
+                            return CompletableFuture.completedFuture(new NodeStoppingException());
+                        }
+                        try {
+                            onIndexDrop(ctx);
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+
+                        return CompletableFuture.completedFuture(null);
+                    }
+
+                    @Override
+                    public @NotNull CompletableFuture<?> onUpdate(@NotNull ConfigurationNotificationEvent<TableIndexView> ctx) {
+                        assert false : "Index cannot be updated [ctx=" + ctx + ']';
+
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
+    }
+
+    private void onIndexDrop(ConfigurationNotificationEvent<TableIndexView> ctx) {
+        TableConfiguration tbl = ctx.config(TableConfiguration.class);
+        String tblName = tbl.name().value();
+
+        InternalSortedIndex idx = idxsByName.remove(ctx.oldValue().name());
+
+        idx.drop();
+
+        fireEvent(IndexEvent.DROP, new IndexEventParameters(
+                idx.name(),
+                tblName
+        ), null);
+    }
+
+    private void onIndexCreate(ConfigurationNotificationEvent<TableIndexView> ctx) throws NodeStoppingException {
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-15916
+        assert ctx.newValue() instanceof SortedIndexView : "Unsupported index type: " + ctx.newValue();
+
+        ExtendedTableConfiguration tblCfg = ctx.config(TableConfiguration.class);
+
+        TableImpl tbl = tblMgr.table(tblCfg.id().value());
+
+        createIndexLocally(tbl, (SortedIndexView) ctx.newValue());
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+    }
+
+    @Override
+    public List<InternalSortedIndex> indexes(UUID tblId) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public InternalSortedIndex createIndex(
+            String idxCanonicalName,
+            String tblCanonicalName,
+            Consumer<TableIndexChange> idxChange
+    ) {
+        return join(createIndexAsync(idxCanonicalName, tblCanonicalName, idxChange));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void dropIndex(
+            String idxCanonicalName
+    ) {
+        join(dropIndexAsync(idxCanonicalName));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> dropIndexAsync(String idxCanonicalName) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+        try {
+            CompletableFuture<Void> idxFut = new CompletableFuture<>();
+
+            indexAsyncInternal(idxCanonicalName).thenAccept((idx) -> {
+                if (idx == null) {
+                    idxFut.completeExceptionally(new IndexNotFoundException(idxCanonicalName));
+                    return;
+                }
+
+                try {
+                    CompletableFuture<TableImpl> tblFut = tblMgr.tableAsync(idx.tableId());
+
+                    tblFut.thenAccept(tbl -> {
+                        tablesCfg.tables().change(ch -> ch.createOrUpdate(
+                                        tbl.name(),
+                                        tblCh -> tblCh.changeIndices(idxes -> idxes.delete(idxCanonicalName))
+                                ))
+                                .whenComplete((res, t) -> {
+                                    if (t != null) {
+                                        Throwable ex = getRootCause(t);
+
+                                        if (!(ex instanceof IndexNotFoundException)) {
+                                            LOG.error("Index wasn't dropped [name={}]", ex, idxCanonicalName);
+                                        }
+
+                                        idxFut.completeExceptionally(ex);
+                                    } else {
+                                        idxFut.complete(null);
+                                    }
+                                });
+                    });
+                } catch (NodeStoppingException ex) {
+                    idxFut.completeExceptionally(ex);
+                }
+            });
+
+            return idxFut;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<InternalSortedIndex> createIndexAsync(
+            String idxCanonicalName,
+            String tblCanonicalName,
+            Consumer<TableIndexChange> idxChange
+    ) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+        try {
+            CompletableFuture<InternalSortedIndex> idxFut = new CompletableFuture<>();
+
+            tblMgr.tableAsync(tblCanonicalName).thenAccept((tbl) -> {
+                if (tbl == null) {
+                    idxFut.completeExceptionally(new TableNotFoundException(tblCanonicalName));
+                }
+
+                indexAsyncInternal(idxCanonicalName).thenAccept((idx) -> {
+                    if (idx != null) {
+                        idxFut.completeExceptionally(new IndexAlreadyExistsException(idxCanonicalName));
+                    }
+
+                    tblMgr.alterTableAsync(tblCanonicalName, chng -> chng.changeIndices(idxes -> {
+                                if (idxes.get(idxCanonicalName) != null) {
+                                    idxFut.completeExceptionally(new IndexAlreadyExistsException(idxCanonicalName));
+                                }
+
+                                idxes.create(idxCanonicalName, idxChange::accept);
+                            }
+                    )).whenComplete((res, t) -> {
+                        if (t != null) {
+                            Throwable ex = getRootCause(t);
+
+                            if (!(ex instanceof IndexAlreadyExistsException)) {
+                                LOG.error("Index wasn't created [name={}]", ex, idxCanonicalName);
+                            }
+
+                            idxFut.completeExceptionally(ex);
+                        } else {
+                            idxFut.complete(idxsByName.get(idxCanonicalName));
+                        }
+                    });
+                });
+            });
+
+            return idxFut;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public InternalSortedIndex index(String idxCanonicalName) {
+        return join(indexAsync(idxCanonicalName));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<InternalSortedIndex> indexAsync(String idxCanonicalName) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+        try {
+            return indexAsyncInternal(idxCanonicalName);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<InternalSortedIndex> indexAsyncInternal(String idxCanonicalName) {
+        var idx = idxsByName.get(idxCanonicalName);
+
+        if (idx != null) {
+            return CompletableFuture.completedFuture(idx);
+        }
+
+        CompletableFuture<InternalSortedIndex> getIdxFut = new CompletableFuture<>();
+
+        EventListener<IndexEventParameters> clo = new EventListener<>() {
+            @Override
+            public boolean notify(@NotNull IndexEventParameters parameters, @Nullable Throwable e) {
+                if (!idxCanonicalName.equals(parameters.indexName())) {
+                    return false;
+                }
+
+                if (e == null) {
+                    getIdxFut.complete(parameters.index());
+                } else {
+                    getIdxFut.completeExceptionally(e);
+                }
+
+                return true;
+            }
+
+            @Override
+            public void remove(@NotNull Throwable e) {
+                getIdxFut.completeExceptionally(e);
+            }
+        };
+
+        listen(IndexEvent.CREATE, clo);
+
+        idx = idxsByName.get(idxCanonicalName);
+
+        if (idx != null && getIdxFut.complete(idx) || !isIndexConfigured(idxCanonicalName) && getIdxFut.complete(null)) {
+            removeListener(IndexEvent.CREATE, clo, null);
+        } else {
+            getIdxFut.thenRun(() -> removeListener(IndexEvent.CREATE, clo, null));
+        }
+
+        return getIdxFut;
+    }
+
+    /**
+     * Checks that the index is configured with specific name.
+     *
+     * @param idxName Index name.
+     * @return True when the table is configured into cluster, false otherwise.
+     */
+    private boolean isIndexConfigured(String idxName) {
+        NamedListView<TableView> directTablesCfg = directProxy(tablesCfg.tables()).value();
+
+        // TODO: IGNITE-15721 Need to review this approach after the ticket would be fixed.
+        // Probably, it won't be required getting configuration of all tables from Metastore.
+        for (String name : directTablesCfg.namedListKeys()) {
+            TableView tableView = directTablesCfg.get(name);
+
+            if (tableView != null) {
+                if (tableView.indices().get(idxName) != null) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+
+    /**
+     * Waits for future result and return, or unwraps {@link CompletionException} to {@link IgniteException} if failed.
+     *
+     * @param future Completable future.
+     * @return Future result.
+     */
+    private <T> T join(CompletableFuture<T> future) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+
+        try {
+            return future.join();
+        } catch (CompletionException ex) {
+            throw convertThrowable(ex.getCause());
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Convert to public throwable.
+     *
+     * @param th Throwable.
+     * @return Public throwable.
+     */
+    private RuntimeException convertThrowable(Throwable th) {
+        if (th instanceof RuntimeException) {
+            return (RuntimeException) th;
+        }
+
+        return new IgniteException(th);
+    }
+
+    /**
+     * Gets a cause exception for a client.
+     *
+     * @param t Exception wrapper.
+     * @return A root exception which will be acceptable to throw for public API.
+     */
+    //TODO: IGNITE-16051 Implement exception converter for public API.
+    private @NotNull IgniteException getRootCause(Throwable t) {
+        Throwable ex;
+
+        if (t instanceof CompletionException) {
+            if (t.getCause() instanceof ConfigurationChangeException) {
+                ex = t.getCause().getCause();
+            } else {
+                ex = t.getCause();
+            }
+        } else {
+            ex = t;
+        }
+
+        return ex instanceof IgniteException ? (IgniteException) ex : new IgniteException(ex);
+    }
+
+    private void createIndexLocally(TableImpl tbl, SortedIndexView idxView) {
+        SchemaDescriptor tblSchema = tbl.schemaView().schema();
+
+        //        Map<String, Column> cols = Stream.concat(
+        //                Arrays.stream(tblSchema.keyColumns().columns()),
+        //                Arrays.stream(tblSchema.valueColumns().columns())
+        //        ).collect(Collectors.toMap(Column::name, Function.identity()));
+
+        List<SortedIndexColumnDescriptor> idxCols = Stream.concat(
+                        idxView.columns().namedListKeys().stream(),
+                        Arrays.stream(tblSchema.keyColumns().columns())
+                                .map(Column::name)
+                )
+                .distinct()
+                .map(colName -> {
+                    IndexColumnView idxCol = idxView.columns().get(colName);
+                    return new SortedIndexColumnDescriptor(
+                            tblSchema.column(colName),
+                            new SortedIndexColumnCollation(idxCol == null || idxCol.asc()));
+                })
+                .collect(Collectors.toList());
+
+        SortedIndexDescriptor idxDesc = new SortedIndexDescriptor(
+                idxView.name(),
+                idxCols,
+                tblSchema.keyColumns().columns()
+        );
+
+        InternalSortedIndexImpl idx = new InternalSortedIndexImpl(
+                idxView.name(),
+                tbl.internalTable().storage().createSortedIndex(idxDesc),
+                tbl
+        );
+
+        tbl.addRowListener(idx);
+
+        idxsByName.put(idx.name(), idx);
+
+        fireEvent(IndexEvent.CREATE, new IndexEventParameters(tbl.name(), idx), null);
+    }
+}
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/idx/InternalSortedIndexImpl.java b/modules/index/src/main/java/org/apache/ignite/internal/idx/InternalSortedIndexImpl.java
new file mode 100644
index 0000000..b7c598c
--- /dev/null
+++ b/modules/index/src/main/java/org/apache/ignite/internal/idx/InternalSortedIndexImpl.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.storage.index.IndexRowPrefix;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.table.StorageRowListener;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Internal index manager facade provides low-level methods for indexes operations.
+ */
+public class InternalSortedIndexImpl implements InternalSortedIndex, StorageRowListener {
+    private final String name;
+
+    private final TableImpl tbl;
+
+    private final SortedIndexStorage store;
+
+    private final SortedIndexDescriptor desc;
+
+    /**
+     * Create sorted index.
+     */
+    public InternalSortedIndexImpl(String name, SortedIndexStorage store, TableImpl tbl) {
+        this.name = name;
+        this.store = store;
+        this.tbl = tbl;
+
+        desc = store.indexDescriptor();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public UUID tableId() {
+        return tbl.tableId();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SortedIndexDescriptor descriptor() {
+        return desc;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Cursor<Tuple> scan(Tuple low, Tuple up, byte scanBoundMask, BitSet proj) {
+        Cursor<IndexRow> cur = store.range(
+                low != null ? new IndexRowPrefixTuple(low) {
+                } : null,
+                up != null ? new IndexRowPrefixTuple(up) : null,
+                r -> true
+        );
+
+        List<String> tblColIds = new ArrayList<>(tbl.schemaView().schema().columnNames());
+        Set<String> idxColIds = desc.columns().stream().map(SortedIndexColumnDescriptor::column).map(Column::name)
+                .collect(Collectors.toSet());
+
+        boolean needLookup = proj.stream().anyMatch(order -> !idxColIds.contains(tblColIds.get(order)));
+
+        final TupleFactory tupleFactory = new TupleFactory(tbl.schemaView().lastSchemaVersion());
+
+        return new IndexCursor(
+                cur,
+                needLookup ? r -> convertWithTableLookup(r, tupleFactory) : r -> convertIndexedOnly(r, tupleFactory)
+        );
+    }
+
+    @Override
+    public void drop() {
+        tbl.internalTable().storage().dropIndex(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void onUpdate(@Nullable BinaryRow oldRow, BinaryRow newRow, int partId) {
+        Tuple tupleNew = TableRow.tuple(tbl.schemaView().resolve(newRow));
+
+        store.put(new IndexRowTuple(tupleNew, newRow.keyRow(), partId));
+
+        if (oldRow != null) {
+            Tuple tupleOld = TableRow.tuple(tbl.schemaView().resolve(oldRow));
+
+            store.remove(new IndexRowTuple(tupleOld, oldRow.keyRow(), partId));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void onRemove(BinaryRow row, int partId) {
+        Tuple t = TableRow.tuple(tbl.schemaView().resolve(row));
+
+        store.remove(new IndexRowTuple(t, row.keyRow(), partId));
+    }
+
+    /**
+     * Used for index only scan.
+     */
+    private Tuple convertIndexedOnly(IndexRow r, TupleFactory tupleFactory) {
+        Tuple t = tupleFactory.create();
+
+        for (int i = 0; i < desc.columns().size(); ++i) {
+            t.set(desc.columns().get(i).column().name(), r.value(i));
+        }
+
+        return t;
+    }
+
+    /**
+     * Additional lookup full row at the table by PK.
+     */
+    private Tuple convertWithTableLookup(IndexRow r, TupleFactory tupleFactory) {
+        try {
+            BinaryRow tblRow = tbl.internalTable().get(r.primaryKey(), null).get();
+
+            Tuple tblTuple = TableRow.tuple(tbl.schemaView().resolve(tblRow));
+
+            Tuple res = tupleFactory.create();
+
+            for (int i = 0; i < res.columnCount(); ++i) {
+                res.set(res.columnName(i), tblTuple.value(res.columnName(i)));
+            }
+
+            return res;
+        } catch (Exception e) {
+            throw new IgniteInternalException("Error on row lookup by index PK "
+                    + "[index=" + name + ", indexRow=" + r + ']', e);
+        }
+    }
+
+    /** Creates table tuple on specified schema. */
+    private class TupleFactory {
+        private final List<String> cols;
+
+        TupleFactory(int tblSchemaVersion) {
+            cols = Stream.concat(
+                            Arrays.stream(tbl.schemaView().schema(tblSchemaVersion).keyColumns().columns()),
+                            Arrays.stream(tbl.schemaView().schema(tblSchemaVersion).valueColumns().columns())
+                    )
+                    .sorted(Comparator.comparing(Column::columnOrder))
+                    .map(Column::name)
+                    .collect(Collectors.toList());
+        }
+
+        Tuple create() {
+            Tuple t = Tuple.create();
+
+            for (String colName : cols) {
+                t.set(colName, null);
+            }
+
+            return t;
+        }
+    }
+
+    private class IndexRowTuple implements IndexRow {
+        private final Tuple tuple;
+
+        private final BinaryRow pk;
+
+        private final int part;
+
+        IndexRowTuple(Tuple t, BinaryRow pk, int part) {
+            this.tuple = t;
+            this.pk = pk;
+            this.part = part;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public BinaryRow primaryKey() {
+            return pk;
+        }
+
+        @Override
+        public int partition() {
+            return part;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Object value(int idxColOrder) {
+            return tuple.value(desc.columns().get(idxColOrder).column().name());
+        }
+    }
+
+    private static class IndexRowPrefixTuple implements IndexRowPrefix {
+        private final Tuple tuple;
+
+        IndexRowPrefixTuple(Tuple t) {
+            this.tuple = t;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Object value(int idxColOrder) {
+            return tuple.value(idxColOrder);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int length() {
+            return tuple.columnCount();
+        }
+    }
+
+    /**
+     * Index store cursor adapter.
+     */
+    private static class IndexCursor implements Cursor<Tuple> {
+        private final Cursor<IndexRow> cur;
+        private final IndexIterator it;
+
+        IndexCursor(Cursor<IndexRow> cur, Function<IndexRow, Tuple> rowConverter) {
+            this.cur = cur;
+            it = new IndexIterator(cur.iterator(), rowConverter);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void close() throws Exception {
+            cur.close();
+        }
+
+        /** {@inheritDoc} */
+        @NotNull
+        @Override
+        public Iterator<Tuple> iterator() {
+            return it;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean hasNext() {
+            return it.hasNext();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Tuple next() {
+            return it.next();
+        }
+    }
+
+    /**
+     * Index store iterator adapter.
+     */
+    private static class IndexIterator implements Iterator<Tuple> {
+        private final Iterator<IndexRow> it;
+
+        private final Function<IndexRow, Tuple> rowConverter;
+
+        private IndexIterator(Iterator<IndexRow> it, Function<IndexRow, Tuple> rowConverter) {
+            this.it = it;
+            this.rowConverter = rowConverter;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean hasNext() {
+            return it.hasNext();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Tuple next() {
+            IndexRow r = it.next();
+
+            return rowConverter.apply(r);
+        }
+    }
+}
diff --git a/modules/runner/pom.xml b/modules/runner/pom.xml
index a5aa970..10b5735 100644
--- a/modules/runner/pom.xml
+++ b/modules/runner/pom.xml
@@ -95,6 +95,16 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-index-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-index</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>info.picocli</groupId>
             <artifactId>picocli</artifactId>
         </dependency>
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index 9d896ec..2ca14bc 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -595,13 +595,14 @@ public class ItTablesApiTest extends IgniteAbstractTest {
      * @param shortTableName Table name.
      */
     protected void addIndex(Ignite node, String schemaName, String shortTableName) {
-        IndexDefinition idx = SchemaBuilders.hashIndex("testHI")
-                .withColumns("valInt", "valStr")
+        IndexDefinition idx = SchemaBuilders.sortedIndex("testHI")
+                .addIndexColumn("valInt").done()
+                .addIndexColumn("valStr").done()
                 .build();
 
         node.tables().alterTable(schemaName + "." + shortTableName, chng -> chng.changeIndices(idxes -> {
             if (idxes.get(idx.name()) != null) {
-                log.info("Index already exists [naem={}]", idx.name());
+                log.info("Index already exists [name={}]", idx.name());
 
                 throw new IndexAlreadyExistsException(idx.name());
             }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexDdlTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexDdlTest.java
new file mode 100644
index 0000000..45315eb
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexDdlTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsTableScan;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.ignite.lang.IndexAlreadyExistsException;
+import org.apache.ignite.lang.TableNotFoundException;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Group of tests that still has not been sorted out. It’s better to avoid extending this class with new tests.
+ */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-15655")
+public class ItIndexDdlTest extends AbstractBasicIntegrationTest {
+    @Test
+    public void indexBasic() {
+        sql("create table test_tbl (id int primary key, val0 int, val1 varchar, val2 int)");
+
+        sql("create index TEST_IDX on test_tbl (val1, val0)");
+
+        insertData(
+                "PUBLIC.TEST_TBL",
+                new String[] {"ID", "VAL0", "VAL1", "VAL2"},
+                new Object[] {0, 1, "val0", 0},
+                new Object[] {1, 2, "val1", 1},
+                new Object[] {2, 3, "val2", 2},
+                new Object[] {3, null, "val3", 3}
+        );
+
+        // Scan index only
+        assertQuery("SELECT VAL0, ID FROM test_tbl WHERE val0 > 1 and val1 > 'val' ORDER BY val1")
+                .matches(containsIndexScan("PUBLIC", "TEST_TBL", "TEST_IDX"))
+                .ordered()
+                .returns(2, 1)
+                .returns(3, 2)
+                .check();
+
+        // Scan index with lookup rows at the table
+        assertQuery("SELECT * FROM test_tbl WHERE val0 > 1 and val1 > 'val' ORDER BY val1")
+                .ordered()
+                .matches(containsIndexScan("PUBLIC", "TEST_TBL", "TEST_IDX"))
+                .returns(1, 2, "val1", 1)
+                .returns(2, 3, "val2", 2)
+                .check();
+
+        sql("drop index TEST_IDX");
+
+        assertQuery("SELECT * FROM test_tbl WHERE val0 > 1 and val1 > 'val' ORDER BY val1")
+                .ordered()
+                .matches(not(containsIndexScan("PUBLIC", "TEST_TBL", "TEST_IDX")))
+                .matches(containsTableScan("PUBLIC", "TEST_TBL"))
+                .returns(1, 2, "val1", 1)
+                .returns(2, 3, "val2", 2)
+                .check();
+    }
+
+    /**
+     * Tests create /drop index through DDL.
+     */
+    @Test
+    public void createDropIndex() {
+        String tblName = "createDropIdx";
+
+        String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with partitions=1", tblName);
+
+        sql(newTblSql);
+
+        sql(String.format("CREATE INDEX index1 ON %s (c1)", tblName));
+
+        sql(String.format("CREATE INDEX IF NOT EXISTS index1 ON %s (c1)", tblName));
+
+        sql(String.format("CREATE INDEX index2 ON %s (c1)", tblName));
+
+        sql(String.format("CREATE INDEX index3 ON %s (c2)", tblName));
+
+        assertThrows(IndexAlreadyExistsException.class, () ->
+                sql(String.format("CREATE INDEX index3 ON %s (c1)", tblName)));
+
+        assertThrows(TableNotFoundException.class, () ->
+                sql(String.format("CREATE INDEX index_3 ON %s (c1)", tblName + "_nonExist")));
+
+        sql(String.format("CREATE INDEX index4 ON %s (c2 desc, c1 asc)", tblName));
+
+        sql(String.format("DROP INDEX index4"));
+
+        sql(String.format("CREATE INDEX index4 ON %s (c2 desc, c1 asc)", tblName));
+
+        sql(String.format("DROP INDEX index4"));
+
+        sql(String.format("DROP INDEX IF EXISTS index4"));
+    }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 0a5afe3..cdf2ec1 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -41,6 +41,8 @@ import org.apache.ignite.internal.configuration.ConfigurationRegistry;
 import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider;
 import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
 import org.apache.ignite.internal.configuration.storage.LocalConfigurationStorage;
+import org.apache.ignite.internal.idx.IndexManager;
+import org.apache.ignite.internal.idx.IndexManagerImpl;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
@@ -131,6 +133,9 @@ public class IgniteImpl implements Ignite {
     /** Distributed table manager. */
     private final TableManager distributedTblMgr;
 
+    /** Index manager. */
+    private final IndexManagerImpl idxManager;
+
     /** Rest module. */
     private final RestModule restModule;
 
@@ -221,9 +226,15 @@ public class IgniteImpl implements Ignite {
                 txManager
         );
 
+        idxManager = new IndexManagerImpl(
+                distributedTblMgr,
+                clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
+        );
+
         qryEngine = new SqlQueryProcessor(
                 clusterSvc,
-                distributedTblMgr
+                distributedTblMgr,
+                idxManager
         );
 
         restModule = new RestModule(nodeCfgMgr, clusterCfgMgr, nettyBootstrapFactory);
@@ -318,6 +329,7 @@ public class IgniteImpl implements Ignite {
                     clusterCfgMgr,
                     baselineMgr,
                     distributedTblMgr,
+                    idxManager,
                     qryEngine,
                     restModule,
                     clientHandlerModule
@@ -352,7 +364,7 @@ public class IgniteImpl implements Ignite {
     public void stop() {
         if (status.getAndSet(Status.STOPPING) == Status.STARTED) {
             doStopNode(List.of(vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, txManager, metaStorageMgr, clusterCfgMgr, baselineMgr,
-                    distributedTblMgr, qryEngine, restModule, clientHandlerModule, nettyBootstrapFactory));
+                    idxManager, distributedTblMgr, qryEngine, restModule, clientHandlerModule, nettyBootstrapFactory));
         }
     }
 
@@ -366,6 +378,10 @@ public class IgniteImpl implements Ignite {
         return qryEngine;
     }
 
+    public IndexManager indexManager() {
+        return idxManager;
+    }
+
     /** {@inheritDoc} */
     @Override
     public IgniteTransactions transactions() {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
index 985d92e..7dce744 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
@@ -74,6 +74,12 @@ public interface BinaryRow {
     ByteBuffer valueSlice();
 
     /**
+     * Get Binary row contains slice representing the key-only fields.
+     * Is used to lookup full row at a table by the key.
+     */
+    BinaryRow keyRow();
+
+    /**
      * Writes binary row to given stream.
      *
      * @param stream Stream to write to.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index ca41aa2..6965227 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -173,6 +173,26 @@ public class ByteBufferRow implements BinaryRow {
     }
 
     /** {@inheritDoc} */
+    @Override
+    public BinaryRow keyRow() {
+        final int off = KEY_CHUNK_OFFSET;
+        final int len = readInteger(off);
+
+        try {
+            ByteBuffer res = ByteBuffer.wrap(buf.limit(off + len).position(0).slice().array());
+
+            res.putShort(FLAGS_FIELD_OFFSET, (short) (res.getShort(FLAGS_FIELD_OFFSET) | RowFlags.NO_VALUE_FLAG));
+            res.putShort(FLAGS_FIELD_OFFSET, (short) (res.getShort(FLAGS_FIELD_OFFSET) | RowFlags.NO_VALUE_FLAG));
+            res.order(ByteOrder.LITTLE_ENDIAN);
+
+            return new ByteBufferRow(res);
+        } finally {
+            buf.position(0); // Reset bounds.
+            buf.limit(buf.capacity());
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public byte[] bytes() {
         // TODO IGNITE-15934 avoid copy.
         byte[] tmp = new byte[buf.limit()];
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java
index 3bd724c..21b3ee2 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java
@@ -251,6 +251,16 @@ public class Column {
         return new Column(schemaIndex, columnOrder, name, type, nullable, defValSup);
     }
 
+    /**
+     * Copy column with new order.
+     *
+     * @param order Column order.
+     * @return Column.
+     */
+    public Column copyWithOrder(int order) {
+        return new Column(-1, order, name, type, nullable, defValSup);
+    }
+
     /** {@inheritDoc} */
     @Override
     public String toString() {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/builder/SortedIndexDefinitionBuilderImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/builder/SortedIndexDefinitionBuilderImpl.java
index 84500b5..8389e0a 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/builder/SortedIndexDefinitionBuilderImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/builder/SortedIndexDefinitionBuilderImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.schema.definition.builder;
 
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -34,7 +34,7 @@ import org.apache.ignite.schema.definition.index.SortedIndexDefinition;
  */
 public class SortedIndexDefinitionBuilderImpl extends AbstractIndexBuilder implements SortedIndexDefinitionBuilder {
     /** Index columns. */
-    protected final Map<String, SortedIndexColumnBuilderImpl> cols = new HashMap<>();
+    protected final Map<String, SortedIndexColumnBuilderImpl> cols = new LinkedHashMap<>();
 
     /**
      * Constructor.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
index 9ed3563..1b37772 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
@@ -837,6 +837,12 @@ public class Row implements BinaryRow, SchemaAware {
 
     /** {@inheritDoc} */
     @Override
+    public BinaryRow keyRow() {
+        return row.keyRow();
+    }
+
+    /** {@inheritDoc} */
+    @Override
     public void writeTo(OutputStream stream) throws IOException {
         row.writeTo(stream);
     }
diff --git a/modules/sql-engine/pom.xml b/modules/sql-engine/pom.xml
index 8520a4e..d13b07c 100644
--- a/modules/sql-engine/pom.xml
+++ b/modules/sql-engine/pom.xml
@@ -53,6 +53,11 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-index-api</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>com.github.ben-manes.caffeine</groupId>
             <artifactId>caffeine</artifactId>
         </dependency>
@@ -184,6 +189,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-index</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <!-- Logging in tests -->
         <dependency>
             <groupId>org.slf4j</groupId>
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 88ed145..001d085 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -25,6 +25,9 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.idx.IndexManager;
+import org.apache.ignite.internal.idx.event.IndexEvent;
+import org.apache.ignite.internal.idx.event.IndexEventParameters;
 import org.apache.ignite.internal.manager.EventListener;
 import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionService;
@@ -58,7 +61,9 @@ public class SqlQueryProcessor implements QueryProcessor {
 
     private final ClusterService clusterSrvc;
 
-    private final TableManager tableManager;
+    private final TableManager tblManager;
+
+    private final IndexManager idxManager;
 
     /** Busy lock for stop synchronisation. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -66,8 +71,11 @@ public class SqlQueryProcessor implements QueryProcessor {
     /** Keeps queries plans to avoid expensive planning of the same queries. */
     private final QueryPlanCache planCache = new QueryPlanCacheImpl(PLAN_CACHE_SIZE);
 
-    /** Event listeners to close. */
-    private final List<Pair<TableEvent, EventListener<TableEventParameters>>> evtLsnrs = new ArrayList<>();
+    /** Table event listeners to close. */
+    private final List<Pair<TableEvent, EventListener<TableEventParameters>>> tblEvtLsnrs = new ArrayList<>();
+
+    /** Index event listeners to close. */
+    private final List<Pair<IndexEvent, EventListener<IndexEventParameters>>> idxEvtLsnrs = new ArrayList<>();
 
     private volatile ExecutionService executionSrvc;
 
@@ -77,12 +85,21 @@ public class SqlQueryProcessor implements QueryProcessor {
 
     private volatile Map<String, SqlExtension> extensions;
 
+    /**
+     * Create SQL query processor.
+     *
+     * @param clusterSrvc  Cluster service.
+     * @param tblManager Table manager.
+     * @param idxManager   Index manager.
+     */
     public SqlQueryProcessor(
             ClusterService clusterSrvc,
-            TableManager tableManager
+            TableManager tblManager,
+            IndexManager idxManager
     ) {
         this.clusterSrvc = clusterSrvc;
-        this.tableManager = tableManager;
+        this.tblManager = tblManager;
+        this.idxManager = idxManager;
     }
 
     /** {@inheritDoc} */
@@ -106,14 +123,15 @@ public class SqlQueryProcessor implements QueryProcessor {
 
         extensions = extensionList.stream().collect(Collectors.toMap(SqlExtension::name, Function.identity()));
 
-        SqlSchemaManagerImpl schemaHolder = new SqlSchemaManagerImpl(tableManager, planCache::clear);
+        SqlSchemaManagerImpl schemaHolder = new SqlSchemaManagerImpl(tblManager, planCache::clear);
 
         executionSrvc = new ExecutionServiceImpl<>(
                 clusterSrvc.topologyService(),
                 msgSrvc,
                 planCache,
                 schemaHolder,
-                tableManager,
+                tblManager,
+                idxManager,
                 taskExecutor,
                 ArrayRowHandler.INSTANCE,
                 extensions
@@ -123,6 +141,9 @@ public class SqlQueryProcessor implements QueryProcessor {
         registerTableListener(TableEvent.ALTER, new TableUpdatedListener(schemaHolder));
         registerTableListener(TableEvent.DROP, new TableDroppedListener(schemaHolder));
 
+        registerIndexListener(IndexEvent.CREATE, new IndexCreatedListener(schemaHolder));
+        registerIndexListener(IndexEvent.DROP, new IndexDroppedListener(schemaHolder));
+
         taskExecutor.start();
         msgSrvc.start();
         executionSrvc.start();
@@ -132,9 +153,15 @@ public class SqlQueryProcessor implements QueryProcessor {
     }
 
     private void registerTableListener(TableEvent evt, AbstractTableEventListener lsnr) {
-        evtLsnrs.add(Pair.of(evt, lsnr));
+        tblEvtLsnrs.add(Pair.of(evt, lsnr));
+
+        tblManager.listen(evt, lsnr);
+    }
+
+    private void registerIndexListener(IndexEvent evt, AbstractIndexEventListener lsnr) {
+        idxEvtLsnrs.add(Pair.of(evt, lsnr));
 
-        tableManager.listen(evt, lsnr);
+        idxManager.listen(evt, lsnr);
     }
 
     /** {@inheritDoc} */
@@ -160,8 +187,12 @@ public class SqlQueryProcessor implements QueryProcessor {
                 planCache::stop
         );
 
-        Stream<AutoCloseable> closableListeners = evtLsnrs.stream()
-                .map((p) -> () -> tableManager.removeListener(p.left, p.right));
+        Stream<AutoCloseable> closableListeners = Stream.concat(
+                tblEvtLsnrs.stream()
+                    .map((p) -> () -> tblManager.removeListener(p.left, p.right)),
+                idxEvtLsnrs.stream()
+                        .map((p) -> () -> idxManager.removeListener(p.left, p.right))
+        );
 
         toClose.addAll(
                 Stream.concat(closableComponents, closableListeners).collect(Collectors.toList())
@@ -187,9 +218,7 @@ public class SqlQueryProcessor implements QueryProcessor {
     private abstract static class AbstractTableEventListener implements EventListener<TableEventParameters> {
         protected final SqlSchemaManagerImpl schemaHolder;
 
-        private AbstractTableEventListener(
-                SqlSchemaManagerImpl schemaHolder
-        ) {
+        private AbstractTableEventListener(SqlSchemaManagerImpl schemaHolder) {
             this.schemaHolder = schemaHolder;
         }
 
@@ -201,15 +230,17 @@ public class SqlQueryProcessor implements QueryProcessor {
     }
 
     private static class TableCreatedListener extends AbstractTableEventListener {
-        private TableCreatedListener(
-                SqlSchemaManagerImpl schemaHolder
-        ) {
+        private TableCreatedListener(SqlSchemaManagerImpl schemaHolder) {
             super(schemaHolder);
         }
 
         /** {@inheritDoc} */
         @Override
         public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) {
+            if (exception != null) {
+                return false;
+            }
+
             schemaHolder.onTableCreated(
                     "PUBLIC",
                     parameters.table()
@@ -220,15 +251,17 @@ public class SqlQueryProcessor implements QueryProcessor {
     }
 
     private static class TableUpdatedListener extends AbstractTableEventListener {
-        private TableUpdatedListener(
-                SqlSchemaManagerImpl schemaHolder
-        ) {
+        private TableUpdatedListener(SqlSchemaManagerImpl schemaHolder) {
             super(schemaHolder);
         }
 
         /** {@inheritDoc} */
         @Override
         public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) {
+            if (exception != null) {
+                return false;
+            }
+
             schemaHolder.onTableUpdated(
                     "PUBLIC",
                     parameters.table()
@@ -239,15 +272,17 @@ public class SqlQueryProcessor implements QueryProcessor {
     }
 
     private static class TableDroppedListener extends AbstractTableEventListener {
-        private TableDroppedListener(
-                SqlSchemaManagerImpl schemaHolder
-        ) {
+        private TableDroppedListener(SqlSchemaManagerImpl schemaHolder) {
             super(schemaHolder);
         }
 
         /** {@inheritDoc} */
         @Override
         public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) {
+            if (exception != null) {
+                return false;
+            }
+
             schemaHolder.onTableDropped(
                     "PUBLIC",
                     parameters.tableName()
@@ -256,4 +291,62 @@ public class SqlQueryProcessor implements QueryProcessor {
             return false;
         }
     }
+
+    private abstract static class AbstractIndexEventListener implements EventListener<IndexEventParameters> {
+        protected final SqlSchemaManagerImpl schemaHolder;
+
+        private AbstractIndexEventListener(SqlSchemaManagerImpl schemaHolder) {
+            this.schemaHolder = schemaHolder;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void remove(@NotNull Throwable exception) {
+            // No-op.
+        }
+    }
+
+    private static class IndexCreatedListener extends AbstractIndexEventListener {
+        private IndexCreatedListener(SqlSchemaManagerImpl schemaHolder) {
+            super(schemaHolder);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean notify(@NotNull IndexEventParameters parameters, @Nullable Throwable exception) {
+            if (exception != null) {
+                return false;
+            }
+
+            schemaHolder.onIndexCreated(
+                    "PUBLIC",
+                    parameters.tableName(),
+                    parameters.index()
+            );
+
+            return false;
+        }
+    }
+
+    private static class IndexDroppedListener extends AbstractIndexEventListener {
+        private IndexDroppedListener(SqlSchemaManagerImpl schemaHolder) {
+            super(schemaHolder);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean notify(@NotNull IndexEventParameters parameters, @Nullable Throwable exception) {
+            if (exception != null) {
+                return false;
+            }
+
+            schemaHolder.onIndexDropped(
+                    "PUBLIC",
+                    parameters.tableName(),
+                    parameters.indexName()
+            );
+
+            return false;
+        }
+    }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AbstractIndexScan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AbstractIndexScan.java
index 028525a..783aa9f 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AbstractIndexScan.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AbstractIndexScan.java
@@ -87,7 +87,9 @@ public abstract class AbstractIndexScan<RowT, IdxRowT> implements Iterable<RowT>
                 this::indexRow2Row
         );
 
-        it = new FilteringIterator<>(it, filters);
+        if (filters != null) {
+            it = new FilteringIterator<>(it, filters);
+        }
 
         if (rowTransformer != null) {
             it = new TransformingIterator<>(it, rowTransformer);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 4ca2422..7b996f3 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -49,6 +49,7 @@ import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.ValidationException;
+import org.apache.ignite.internal.idx.IndexManager;
 import org.apache.ignite.internal.sql.engine.ResultSetMetadata;
 import org.apache.ignite.internal.sql.engine.SqlCursor;
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
@@ -147,8 +148,6 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
 
     private final Map<String, SqlExtension> extensions;
 
-    private final TableManager tableManager;
-
     private final DdlCommandHandler ddlCmdHnd;
 
     /**
@@ -161,6 +160,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
             QueryPlanCache planCache,
             SqlSchemaManager sqlSchemaManager,
             TableManager tblManager,
+            IndexManager idxManager,
             QueryTaskExecutor taskExecutor,
             RowHandler<RowT> handler,
             Map<String, SqlExtension> extensions
@@ -171,9 +171,8 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
         this.sqlSchemaManager = sqlSchemaManager;
         this.taskExecutor = taskExecutor;
         this.extensions = extensions;
-        tableManager = tblManager;
 
-        ddlCmdHnd = new DdlCommandHandler(tableManager);
+        ddlCmdHnd = new DdlCommandHandler(tblManager, idxManager);
 
         locNodeId = topSrvc.localMember().id();
         qryPlanCache = planCache;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/IndexScan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/IndexScan.java
new file mode 100644
index 0000000..7497276
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/IndexScan.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.idx.InternalSortedIndex;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.Tuple;
+
+/**
+ * Scan on index.
+ */
+public class IndexScan<RowT> extends AbstractIndexScan<RowT, Tuple> {
+    private final RowFactory<RowT> tableRowFactory;
+
+    /**
+     * Creates index scan.
+     */
+    public IndexScan(
+            IgniteIndex idx,
+            ExecutionContext<RowT> ectx,
+            ColocationGroup colocationGrp,
+            Predicate<RowT> filters,
+            Supplier<RowT> lower,
+            Supplier<RowT> upper,
+            Function<RowT, RowT> rowTransformer,
+            ImmutableBitSet requiredColumns
+    ) {
+        super(
+                ectx,
+                idx.table().getRowType(ectx.getTypeFactory(), requiredColumns),
+                new TreeIndexWrapper(idx.index(), requiredColumns.toBitSet()),
+                filters,
+                lower,
+                upper,
+                rowTransformer
+        );
+
+        tableRowFactory = ectx.rowHandler().factory(ectx.getTypeFactory(), idx.table().getRowType(ectx.getTypeFactory()));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public synchronized Iterator<RowT> iterator() {
+        try {
+            return super.iterator();
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected Tuple row2indexRow(RowT bound) {
+        if (bound == null) {
+            return null;
+        }
+
+        RowHandler<RowT> hnd = ectx.rowHandler();
+
+        Tuple t = Tuple.create();
+
+        for (int i = 0; i < hnd.columnCount(bound); ++i) {
+            t.set("idx" + i, hnd.get(i, bound));
+        }
+
+        return t;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected RowT indexRow2Row(Tuple t) {
+        RowT row = tableRowFactory.create();
+
+        RowHandler<RowT> hnd = ectx.rowHandler();
+
+        for (int i = 0; i < t.columnCount(); ++i) {
+            hnd.set(i, row, t.value(i));
+        }
+
+        return row;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() {
+    }
+
+    /**
+     * Sorted index wrapper.
+     */
+    private static class TreeIndexWrapper implements TreeIndex<Tuple> {
+        /** Underlying index. */
+        private final InternalSortedIndex idx;
+
+        private final BitSet requiredColumns;
+
+        /**
+         * Creates wrapper for InternalSortedIndex.
+         */
+        private TreeIndexWrapper(InternalSortedIndex idx, BitSet requiredColumns) {
+            this.idx = idx;
+            this.requiredColumns = requiredColumns;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Cursor<Tuple> find(Tuple lower, Tuple upper) {
+            try {
+                return idx.scan(
+                        lower,
+                        upper,
+                        (byte) (InternalSortedIndex.GREATER_OR_EQUAL | InternalSortedIndex.LESS_OR_EQUAL),
+                        requiredColumns
+                );
+            } catch (Exception e) {
+                throw new IgniteException("Failed to find index rows", e);
+            }
+        }
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index f107252..09076ec 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -97,6 +97,7 @@ import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleHashAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleSortAggregate;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
 import org.apache.ignite.internal.sql.engine.trait.Destination;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
@@ -284,30 +285,36 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
     /** {@inheritDoc} */
     @Override
     public Node<RowT> visit(IgniteIndexScan rel) {
-        // TODO: fix this
-        //        RexNode condition = rel.condition();
-        //        List<RexNode> projects = rel.projects();
+        RexNode condition = rel.condition();
+        List<RexNode> projects = rel.projects();
 
         InternalIgniteTable tbl = rel.getTable().unwrap(InternalIgniteTable.class);
         IgniteTypeFactory typeFactory = ctx.getTypeFactory();
 
         ImmutableBitSet requiredColumns = rel.requiredColumns();
-        //        List<RexNode> lowerCond = rel.lowerBound();
-        //        List<RexNode> upperCond = rel.upperBound();
+        List<RexNode> lowerCond = rel.lowerBound();
+        List<RexNode> upperCond = rel.upperBound();
 
         RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
 
-        //        Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
-        //        Supplier<Row> lower = lowerCond == null ? null : expressionFactory.rowSource(lowerCond);
-        //        Supplier<Row> upper = upperCond == null ? null : expressionFactory.rowSource(upperCond);
-        //        Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, rowType);
-        //
-        //        IgniteIndex idx = tbl.getIndex(rel.indexName());
-        //
-        //        ColocationGroup group = ctx.group(rel.sourceId());
-
-        Iterable<RowT> rowsIter = (Iterable<RowT>) List.of(new Object[]{0, 0},
-                new Object[]{1, 1}); //idx.scan(ctx, group, filters, lower, upper, prj, requiredColumns);
+        Predicate<RowT> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
+        Supplier<RowT> lower = lowerCond == null ? null : expressionFactory.rowSource(lowerCond);
+        Supplier<RowT> upper = upperCond == null ? null : expressionFactory.rowSource(upperCond);
+        Function<RowT, RowT> prj = projects == null ? null : expressionFactory.project(projects, rowType);
+
+        IgniteIndex idx = tbl.getIndex(rel.indexName());
+
+        ColocationGroup group = ctx.group(rel.sourceId());
+
+        Iterable<RowT> rowsIter = idx.scan(
+                ctx,
+                group,
+                filters,
+                lower,
+                upper,
+                prj,
+                requiredColumns != null ? requiredColumns : ImmutableBitSet.range(rowType.getFieldCount())
+        );
 
         return new ScanNode<>(ctx, rowType, rowsIter);
     }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndex.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndex.java
index ecca66f..f6e3155 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndex.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndex.java
@@ -29,6 +29,8 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -41,6 +43,8 @@ public class RuntimeSortedIndex<RowT> implements RuntimeIndex<RowT>, TreeIndex<R
 
     protected final Comparator<RowT> comp;
 
+    protected final RelDataType rowType;
+
     private final RelCollation collation;
 
     private final ArrayList<RowT> rows = new ArrayList<>();
@@ -54,11 +58,13 @@ public class RuntimeSortedIndex<RowT> implements RuntimeIndex<RowT>, TreeIndex<R
      */
     public RuntimeSortedIndex(
             ExecutionContext<RowT> ectx,
+            RelDataType rowType,
             RelCollation collation,
             Comparator<RowT> comp
     ) {
         this.ectx = ectx;
         this.comp = comp;
+        this.rowType = rowType;
 
         assert Objects.nonNull(collation);
 
@@ -207,7 +213,15 @@ public class RuntimeSortedIndex<RowT> implements RuntimeIndex<RowT>, TreeIndex<R
         /** {@inheritDoc} */
         @Override
         protected RowT row2indexRow(RowT bound) {
-            return bound;
+            RowHandler<RowT> hndl = ectx.rowHandler();
+            RowT rowBound  = hndl.factory(Commons.typeFactory(), rowType).create();
+
+            ImmutableIntList keys = collation.getKeys();
+            for (int i = 0; i < keys.size(); ++i) {
+                hndl.set(keys.get(i), rowBound, hndl.get(i, bound));
+            }
+
+            return rowBound;
         }
 
         /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index 2d30895..f5c0a12 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -32,6 +32,7 @@ import org.apache.ignite.configuration.NamedListView;
 import org.apache.ignite.configuration.schemas.table.ColumnView;
 import org.apache.ignite.configuration.schemas.table.PrimaryKeyView;
 import org.apache.ignite.configuration.schemas.table.TableChange;
+import org.apache.ignite.internal.idx.IndexManager;
 import org.apache.ignite.internal.schema.definition.TableDefinitionImpl;
 import org.apache.ignite.internal.sql.engine.prepare.PlanningContext;
 import org.apache.ignite.internal.sql.engine.prepare.ddl.AbstractTableDdlCommand;
@@ -54,6 +55,7 @@ import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteStringFormatter;
 import org.apache.ignite.lang.IndexAlreadyExistsException;
+import org.apache.ignite.lang.IndexNotFoundException;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.lang.TableNotFoundException;
 import org.apache.ignite.schema.SchemaBuilders;
@@ -64,10 +66,13 @@ import org.apache.ignite.schema.definition.builder.SortedIndexDefinitionBuilder.
 
 /** DDL commands handler. */
 public class DdlCommandHandler {
-    private final TableManager tableManager;
+    private final TableManager tblManager;
 
-    public DdlCommandHandler(TableManager tblManager) {
-        tableManager = tblManager;
+    private final IndexManager idxManager;
+
+    public DdlCommandHandler(TableManager tblManager, IndexManager idxManager) {
+        this.tblManager = tblManager;
+        this.idxManager = idxManager;
     }
 
     /** Planning context. */
@@ -154,7 +159,7 @@ public class DdlCommandHandler {
         );
 
         try {
-            tableManager.createTable(fullName, tblChanger);
+            tblManager.createTable(fullName, tblChanger);
         } catch (TableAlreadyExistsException ex) {
             if (!cmd.ifTableExists()) {
                 throw ex;
@@ -169,7 +174,7 @@ public class DdlCommandHandler {
                 IgniteObjectName.quote(cmd.tableName())
         );
         try {
-            tableManager.dropTable(fullName);
+            tblManager.dropTable(fullName);
         } catch (TableNotFoundException ex) {
             if (!cmd.ifTableExists()) {
                 throw ex;
@@ -237,33 +242,35 @@ public class DdlCommandHandler {
                 IgniteObjectName.quote(cmd.tableName())
         );
 
-        tableManager.alterTable(fullName, chng -> chng.changeIndices(idxes -> {
-            if (idxes.get(cmd.indexName()) != null) {
-                if (!cmd.ifIndexNotExists()) {
-                    throw new IndexAlreadyExistsException(cmd.indexName());
-                } else {
-                    return;
-                }
+        try {
+            idxManager.createIndex(cmd.indexName(), fullName, idxCh -> convert(idx.build(), idxCh));
+        } catch (IndexAlreadyExistsException e) {
+            if (!cmd.ifIndexNotExists()) {
+                throw e;
             }
-
-            idxes.create(cmd.indexName(), tableIndexChange -> convert(idx.build(), tableIndexChange));
-        }));
+        }
     }
 
     /** Handles drop index command. */
     private void handleDropIndex(DropIndexCommand cmd) {
-        throw new UnsupportedOperationException("DROP INDEX command not supported for now.");
+        try {
+            idxManager.dropIndex(cmd.indexName());
+        } catch (IndexNotFoundException e) {
+            if (!cmd.ifExist()) {
+                throw e;
+            }
+        }
     }
 
     /**
      * Adds a column according to the column definition.
      *
      * @param fullName Table with schema name.
-     * @param colsDef  Columns defenitions.
+     * @param colsDef  Columns definitions.
      * @param colNotExist Flag indicates exceptionally behavior in case of already existing column.
      */
     private void addColumnInternal(String fullName, List<ColumnDefinition> colsDef, boolean colNotExist) {
-        tableManager.alterTable(
+        tblManager.alterTable(
                 fullName,
                 chng -> chng.changeColumns(cols -> {
                     Map<String, String> colNamesToOrders = columnOrdersToNames(chng.columns());
@@ -304,7 +311,7 @@ public class DdlCommandHandler {
      * @param colExist Flag indicates exceptionally behavior in case of already existing column.
      */
     private void dropColumnInternal(String fullName, Set<String> colNames, boolean colExist) {
-        tableManager.alterTable(
+        tblManager.alterTable(
                 fullName,
                 chng -> chng.changeColumns(cols -> {
                     PrimaryKeyView priKey = chng.primaryKey();
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
index 7289111..da0ce40 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
@@ -174,7 +174,7 @@ public class IndexSpoolNode<RowT> extends AbstractNode<RowT> implements SingleNo
             Supplier<RowT> lowerIdxBound,
             Supplier<RowT> upperIdxBound
     ) {
-        RuntimeSortedIndex<RowT> idx = new RuntimeSortedIndex<>(ctx, collation, comp);
+        RuntimeSortedIndex<RowT> idx = new RuntimeSortedIndex<>(ctx, rowType, collation, comp);
 
         ScanNode<RowT> scan = new ScanNode<>(
                 ctx,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/FilterSpoolMergeToSortedIndexSpoolRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/FilterSpoolMergeToSortedIndexSpoolRule.java
index 74539d1..ccada61 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/FilterSpoolMergeToSortedIndexSpoolRule.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/FilterSpoolMergeToSortedIndexSpoolRule.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.rule;
 
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
+import it.unimi.dsi.fastutil.ints.IntArrayList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
@@ -77,7 +78,7 @@ public class FilterSpoolMergeToSortedIndexSpoolRule extends RelRule<FilterSpoolM
             return;
         }
 
-        RelCollation collation = TraitUtils.createCollation(idxCond.keys());
+        RelCollation collation = TraitUtils.createCollation(IntArrayList.of(idxCond.keys().toIntArray()));
 
         RelNode res = new IgniteSortedIndexSpool(
                 cluster,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
index cda9d87..e300413 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
@@ -17,7 +17,15 @@
 
 package org.apache.ignite.internal.sql.engine.schema;
 
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
 import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.idx.InternalSortedIndex;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.IndexScan;
+import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
 
 /**
  * Ignite scannable index.
@@ -27,7 +35,8 @@ public class IgniteIndex {
 
     private final String idxName;
 
-    //    private final GridIndex<H2Row> idx;
+    private final InternalSortedIndex idx;
+
     private final InternalIgniteTable tbl;
 
     /**
@@ -35,8 +44,25 @@ public class IgniteIndex {
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public IgniteIndex(RelCollation collation, String name, InternalIgniteTable tbl) {
+        this(collation, name, null, tbl);
+    }
+
+    /**
+     * Constructor.
+     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     */
+    public IgniteIndex(RelCollation collation, InternalSortedIndex idx, InternalIgniteTable tbl) {
+        this(collation, idx.name(), idx, tbl);
+    }
+
+    /**
+     * Constructor.
+     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     */
+    private IgniteIndex(RelCollation collation, String name, InternalSortedIndex idx, InternalIgniteTable tbl) {
         this.collation = collation;
         idxName = name;
+        this.idx = idx;
         this.tbl = tbl;
     }
 
@@ -51,4 +77,32 @@ public class IgniteIndex {
     public InternalIgniteTable table() {
         return tbl;
     }
+
+    public InternalSortedIndex index() {
+        return idx;
+    }
+
+    /**
+     * Scan index.
+     */
+    public <RowT> Iterable<RowT> scan(
+            ExecutionContext<RowT> ectx,
+            ColocationGroup colocationGrp,
+            Predicate<RowT> filters,
+            Supplier<RowT> lower,
+            Supplier<RowT> upper,
+            Function<RowT, RowT> rowTransformer,
+            ImmutableBitSet requiredColumns
+    ) {
+        return new IndexScan<>(
+                this,
+                ectx,
+                colocationGrp,
+                filters,
+                lower,
+                upper,
+                rowTransformer,
+                requiredColumns
+        );
+    }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
index 861f6be..0504dd0 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
@@ -73,4 +73,14 @@ public class IgniteSchema extends AbstractSchema {
     public void removeTable(String tblName) {
         tblMap.remove(tblName);
     }
+
+    /**
+     * Return internal table.
+     *
+     * @param name Table's name.
+     * @return Internal table.
+     */
+    public InternalIgniteTable internalTable(String name) {
+        return tblMap.get(name);
+    }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
index 08d0251..75f5c59 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine.schema;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -192,6 +193,15 @@ public class IgniteTableImpl extends AbstractTable implements InternalIgniteTabl
         indexes.put(idxTbl.name(), idxTbl);
     }
 
+    /**
+     * Add indexes.
+     *
+     * @param idxs Indexes collection.
+     */
+    public void addIndexes(Collection<IgniteIndex> idxs) {
+        idxs.forEach(idx -> indexes.put(idx.name(), idx));
+    }
+
     /** {@inheritDoc} */
     @Override
     public IgniteIndex getIndex(String idxName) {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index ed33d0b..e5bd3a9 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -24,11 +24,16 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.impl.AbstractSchema;
 import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.idx.InternalSortedIndex;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.sql.engine.extension.SqlExtension.ExternalCatalog;
@@ -154,8 +159,17 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager {
             String schemaName,
             TableImpl table
     ) {
+        IgniteTableImpl igniteTable = createTable(schemaName, table);
+
         IgniteSchema schema = igniteSchemas.computeIfAbsent(schemaName, IgniteSchema::new);
 
+        schema.addTable(removeSchema(schemaName, table.name()), igniteTable);
+        tablesById.put(igniteTable.id(), igniteTable);
+
+        rebuild();
+    }
+
+    private IgniteTableImpl createTable(String schemaName, TableImpl table) {
         SchemaDescriptor descriptor = table.schemaView().schema();
 
         List<ColumnDescriptor> colDescriptors = descriptor.columnNames().stream()
@@ -171,16 +185,11 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager {
                 ))
                 .collect(Collectors.toList());
 
-        IgniteTableImpl igniteTable = new IgniteTableImpl(
+        return new IgniteTableImpl(
                 new TableDescriptorImpl(colDescriptors),
                 table.internalTable(),
                 table.schemaView()
         );
-
-        schema.addTable(removeSchema(schemaName, table.name()), igniteTable);
-        tablesById.put(igniteTable.id(), igniteTable);
-
-        rebuild();
     }
 
     /**
@@ -191,7 +200,22 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager {
             String schemaName,
             TableImpl table
     ) {
-        onTableCreated(schemaName, table);
+        IgniteTableImpl igniteTable = createTable(schemaName, table);
+
+        IgniteSchema schema = igniteSchemas.computeIfAbsent(schemaName, IgniteSchema::new);
+
+        String tblName = removeSchema(schemaName, table.name());
+
+        // Rebuild indexes collation.
+        igniteTable.addIndexes(schema.internalTable(tblName).indexes().values().stream()
+                .map(idx -> createIndex(idx.index(), igniteTable))
+                .collect(Collectors.toList())
+        );
+
+        schema.addTable(tblName, igniteTable);
+        tablesById.put(igniteTable.id(), igniteTable);
+
+        rebuild();
     }
 
     /**
@@ -213,6 +237,49 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager {
         rebuild();
     }
 
+    /**
+     * Build new SQL schema when new index is created.
+     */
+    public void onIndexCreated(String schema, String tblName, InternalSortedIndex idx) {
+        InternalIgniteTable tbl = igniteSchemas.get(schema).internalTable(removeSchema(schema, tblName));
+
+        tbl.addIndex(createIndex(idx, tbl));
+
+        rebuild();
+    }
+
+    /**
+     * TODO: https://issues.apache.org/jira/browse/IGNITE-15480
+     * columns mapping should be masted on column ID instead of column name.
+     */
+    private IgniteIndex createIndex(InternalSortedIndex idx, InternalIgniteTable tbl) {
+        List<RelFieldCollation> idxFieldsCollation = idx.descriptor().columns().stream()
+                .map(c ->
+                        new RelFieldCollation(
+                                tbl.descriptor().columnDescriptor(c.column().name()).logicalIndex(),
+                                c.asc() ? Direction.ASCENDING : Direction.DESCENDING,
+                                NullDirection.FIRST
+                        )
+                ).collect(Collectors.toList());
+
+        return new IgniteIndex(
+                RelCollations.of(idxFieldsCollation),
+                idx,
+                tbl
+        );
+    }
+
+    /**
+     * Build new SQL schema when existed index is dropped.
+     */
+    public void onIndexDropped(String schema, String tblName, String idxName) {
+        InternalIgniteTable tbl = igniteSchemas.get(schema).internalTable(removeSchema(schema, tblName));
+
+        tbl.removeIndex(idxName);
+
+        rebuild();
+    }
+
     private void rebuild() {
         SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
 
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
index 6a3f816..5ec2a1c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
@@ -28,7 +28,7 @@ import static org.apache.ignite.internal.util.CollectionUtils.first;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
 import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
-import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.ints.IntList;
 import java.lang.reflect.Proxy;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -466,7 +466,7 @@ public class TraitUtils {
      * @param keys The keys to create collation from.
      * @return New collation.
      */
-    public static RelCollation createCollation(IntSet keys) {
+    public static RelCollation createCollation(IntList keys) {
         return RelCollations.of(
                 keys.intStream().mapToObj(RelFieldCollation::new).collect(Collectors.toList())
         );
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IndexConditions.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IndexConditions.java
index e1f6612..045031b 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IndexConditions.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IndexConditions.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.sql.engine.util;
 
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
 import java.util.List;
@@ -40,12 +39,14 @@ public class IndexConditions {
 
     private final List<RexNode> upperBound;
 
+    private final IntSet keys;
+
     /**
      * Constructor.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public IndexConditions() {
-        this(null, null, null, null);
+        this(null, null, null, null, null);
     }
 
     /**
@@ -56,12 +57,14 @@ public class IndexConditions {
             @Nullable List<RexNode> lowerCond,
             @Nullable List<RexNode> upperCond,
             @Nullable List<RexNode> lowerBound,
-            @Nullable List<RexNode> upperBound
+            @Nullable List<RexNode> upperBound,
+            @Nullable IntSet keys
     ) {
         this.lowerCond = lowerCond;
         this.upperCond = upperCond;
         this.lowerBound = lowerBound;
         this.upperBound = upperBound;
+        this.keys = keys;
     }
 
     /**
@@ -71,6 +74,7 @@ public class IndexConditions {
     public IndexConditions(RelInput input) {
         lowerCond = null;
         upperCond = null;
+        keys = null;
         lowerBound = input.get("lower") == null ? null : input.getExpressionList("lower");
         upperBound = input.get("upper") == null ? null : input.getExpressionList("upper");
     }
@@ -108,22 +112,7 @@ public class IndexConditions {
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     public IntSet keys() {
-        if (upperBound == null && lowerBound == null) {
-            return IntSets.EMPTY_SET;
-        }
-
-        IntSet keys = new IntOpenHashSet();
-
-        int cols = lowerBound != null ? lowerBound.size() : upperBound.size();
-
-        for (int i = 0; i < cols; ++i) {
-            if (upperBound != null && RexUtils.isNotNull(upperBound.get(i))
-                    || lowerBound != null && RexUtils.isNotNull(lowerBound.get(i))) {
-                keys.add(i);
-            }
-        }
-
-        return IntSets.unmodifiable(keys);
+        return  keys == null ? IntSets.EMPTY_SET : keys;
     }
 
     /**
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
index 3fdc23c..b699191 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
@@ -310,19 +310,25 @@ public class RexUtils {
         List<RexNode> lowerBound = null;
         List<RexNode> upperBound = null;
 
+        List<RexNode> allIdxCondition = new ArrayList<>();
+
         if (!nullOrEmpty(lower)) {
             lowerBound = asBound(cluster, lower, rowType, mapping);
+            allIdxCondition.addAll(lower);
         } else {
             lower = null;
         }
 
         if (!nullOrEmpty(upper)) {
             upperBound = asBound(cluster, upper, rowType, mapping);
+            allIdxCondition.addAll(upper);
         } else {
             upper = null;
         }
 
-        return new IndexConditions(lower, upper, lowerBound, upperBound);
+        IntSet keys = indexKeys(allIdxCondition, mapping);
+
+        return new IndexConditions(lower, upper, lowerBound, upperBound, keys);
     }
 
     /**
@@ -470,7 +476,8 @@ public class RexUtils {
 
         RexBuilder builder = builder(cluster);
         List<RelDataType> types = RelOptUtil.getFieldTypeList(rowType);
-        List<RexNode> res = makeListOfNullLiterals(builder, types);
+
+        List<RexNode> res = new ArrayList<>();
 
         for (RexNode pred : idxCond) {
             assert pred instanceof RexCall;
@@ -485,13 +492,40 @@ public class RexUtils {
 
             assert index != -1;
 
-            res.set(index, makeCast(builder, cond, types.get(index)));
+            res.add(makeCast(builder, cond, types.get(index)));
         }
 
         return res;
     }
 
     /**
+     * Index keys used by condition.
+     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     */
+    public static IntSet indexKeys(Iterable<RexNode> idxCond, @Nullable Mappings.TargetMapping mapping) {
+        if (nullOrEmpty(idxCond)) {
+            return null;
+        }
+
+        IntSet keys = new IntOpenHashSet();
+
+        for (RexNode pred : idxCond) {
+            assert pred instanceof RexCall;
+
+            RexCall call = (RexCall) pred;
+            RexSlot ref = (RexSlot) RexUtil.removeCast(call.operands.get(0));
+
+            int index = mapping == null ? ref.getIndex() : mapping.getSourceOpt(ref.getIndex());
+
+            assert index != -1;
+
+            keys.add(index);
+        }
+
+        return keys;
+    }
+
+    /**
      * Permutation.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 042c025..af66f32 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -35,6 +35,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Flow;
+import org.apache.ignite.internal.idx.IndexManager;
 import org.apache.ignite.internal.manager.EventListener;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
@@ -82,6 +83,9 @@ public class StopCalciteModuleTest {
     TableManager tableManager;
 
     @Mock
+    IndexManager idxManager;
+
+    @Mock
     MessagingService msgSrvc;
 
     @Mock
@@ -168,7 +172,7 @@ public class StopCalciteModuleTest {
 
     @Test
     public void testStopQueryOnNodeStop() throws Exception {
-        SqlQueryProcessor qryProc = new SqlQueryProcessor(clusterSrvc, tableManager);
+        SqlQueryProcessor qryProc = new SqlQueryProcessor(clusterSrvc, tableManager, idxManager);
 
         when(tbl.tableId()).thenReturn(UUID.randomUUID());
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index d234828..b6c44b4 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -42,6 +42,8 @@ import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.configuration.schema.ExtendedTableConfigurationSchema;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.idx.IndexManager;
+import org.apache.ignite.internal.idx.IndexManagerImpl;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaUtils;
@@ -57,7 +59,6 @@ import org.apache.ignite.lang.ColumnAlreadyExistsException;
 import org.apache.ignite.lang.ColumnNotFoundException;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.IndexAlreadyExistsException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.lang.TableNotFoundException;
@@ -72,7 +73,6 @@ import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
@@ -125,6 +125,8 @@ public class MockedStructuresTest extends IgniteAbstractTest {
 
     TableManager tblManager;
 
+    IndexManager idxManager;
+
     SqlQueryProcessor queryProc;
 
     /** Test node. */
@@ -159,7 +161,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
     void before() throws NodeStoppingException {
         tblManager = mockManagers();
 
-        queryProc = new SqlQueryProcessor(cs, tblManager);
+        queryProc = new SqlQueryProcessor(cs, tblManager, idxManager);
 
         queryProc.start();
     }
@@ -378,48 +380,6 @@ public class MockedStructuresTest extends IgniteAbstractTest {
                 String.format("ALTER TABLE %s DROP COLUMN c4", curMethodName)));
     }
 
-    /**
-     * Tests create a table through public API.
-     */
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-16032")
-    @Test
-    public void testCreateDropIndex() {
-        SqlQueryProcessor finalQueryProc = queryProc;
-
-        String curMethodName = getCurrentMethodName();
-
-        String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with partitions=1", curMethodName);
-
-        queryProc.query("PUBLIC", newTblSql);
-
-        assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
-                .equalsIgnoreCase("PUBLIC." + curMethodName)));
-
-        queryProc.query("PUBLIC", String.format("CREATE INDEX index1 ON %s (c1)", curMethodName));
-
-        queryProc.query("PUBLIC", String.format("CREATE INDEX IF NOT EXISTS index1 ON %s (c1)", curMethodName));
-
-        queryProc.query("PUBLIC", String.format("CREATE INDEX index2 ON %s (c1)", curMethodName));
-
-        queryProc.query("PUBLIC", String.format("CREATE INDEX index3 ON %s (c2)", curMethodName));
-
-        assertThrows(IndexAlreadyExistsException.class, () ->
-                finalQueryProc.query("PUBLIC", String.format("CREATE INDEX index3 ON %s (c1)", curMethodName)));
-
-        assertThrows(IgniteException.class, () ->
-                finalQueryProc.query("PUBLIC", String.format("CREATE INDEX index_3 ON %s (c1)", curMethodName + "_nonExist")));
-
-        queryProc.query("PUBLIC", String.format("CREATE INDEX index4 ON %s (c2 desc, c1 asc)", curMethodName));
-
-        queryProc.query("PUBLIC", String.format("DROP INDEX index4 ON %s", curMethodName));
-
-        queryProc.query("PUBLIC", String.format("CREATE INDEX index4 ON %s (c2 desc, c1 asc)", curMethodName));
-
-        queryProc.query("PUBLIC", String.format("DROP INDEX index4 ON %s", curMethodName));
-
-        queryProc.query("PUBLIC", String.format("DROP INDEX IF EXISTS index4 ON %s", curMethodName));
-    }
-
     // todo copy-paste from TableManagerTest will be removed after https://issues.apache.org/jira/browse/IGNITE-16050
     /**
      * Instantiates a table and prepares Table manager.
@@ -490,6 +450,8 @@ public class MockedStructuresTest extends IgniteAbstractTest {
                 tm
         );
 
+        idxManager = new IndexManagerImpl(tableManager, mock(TablesConfiguration.class));
+
         tableManager.start();
 
         return tableManager;
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index 07dbbaf..57901c8 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -118,6 +118,7 @@ public class RuntimeSortedIndexTest extends IgniteAbstractTest {
                         ArrayRowHandler.INSTANCE,
                         ImmutableMap.of()
                 ),
+                rowType,
                 RelCollations.of(ImmutableIntList.copyOf(idxCols)),
                 (o1, o2) -> {
                     for (int colIdx : idxCols) {
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
index bf27768..351071f 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
@@ -26,7 +26,6 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexFieldAccess;
-import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
@@ -100,23 +99,21 @@ public class CorrelatedNestedLoopJoinPlannerTest extends AbstractPlannerTest {
 
         IgniteIndexScan idxScan = findFirstNode(phys, byClass(IgniteIndexScan.class));
 
+        assertTrue(idxScan.indexConditions().keys().contains(1));
+
         List<RexNode> lowerBound = idxScan.lowerBound();
 
         assertNotNull(lowerBound, "Invalid plan\n" + RelOptUtil.toString(phys));
-        assertEquals(3, lowerBound.size());
+        assertEquals(1, lowerBound.size());
 
-        assertTrue(((RexLiteral) lowerBound.get(0)).isNull());
-        assertTrue(((RexLiteral) lowerBound.get(2)).isNull());
-        assertTrue(lowerBound.get(1) instanceof RexFieldAccess);
+        assertTrue(lowerBound.get(0) instanceof RexFieldAccess);
 
         List<RexNode> upperBound = idxScan.upperBound();
 
         assertNotNull(upperBound, "Invalid plan\n" + RelOptUtil.toString(phys));
-        assertEquals(3, upperBound.size());
+        assertEquals(1, upperBound.size());
 
-        assertTrue(((RexLiteral) upperBound.get(0)).isNull());
-        assertTrue(((RexLiteral) upperBound.get(2)).isNull());
-        assertTrue(upperBound.get(1) instanceof RexFieldAccess);
+        assertTrue(upperBound.get(0) instanceof RexFieldAccess);
     }
 
     /**
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java
index 526fef1..68961c9 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java
@@ -26,7 +26,6 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexFieldAccess;
-import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
@@ -103,11 +102,9 @@ public class HashIndexSpoolPlannerTest extends AbstractPlannerTest {
         List<RexNode> searchRow = idxSpool.searchRow();
 
         assertNotNull(searchRow);
-        assertEquals(3, searchRow.size());
+        assertEquals(1, searchRow.size());
 
-        assertTrue(((RexLiteral) searchRow.get(0)).isNull());
-        assertTrue(((RexLiteral) searchRow.get(2)).isNull());
-        assertTrue(searchRow.get(1) instanceof RexFieldAccess);
+        assertTrue(searchRow.get(0) instanceof RexFieldAccess);
     }
 
     @Test
@@ -165,12 +162,10 @@ public class HashIndexSpoolPlannerTest extends AbstractPlannerTest {
         List<RexNode> searcRow = idxSpool.searchRow();
 
         assertNotNull(searcRow);
-        assertEquals(4, searcRow.size());
+        assertEquals(2, searcRow.size());
 
-        assertTrue(((RexLiteral) searcRow.get(0)).isNull());
+        assertTrue(searcRow.get(0) instanceof RexFieldAccess);
         assertTrue(searcRow.get(1) instanceof RexFieldAccess);
-        assertTrue(searcRow.get(2) instanceof RexFieldAccess);
-        assertTrue(((RexLiteral) searcRow.get(3)).isNull());
     }
 
     /**
@@ -228,10 +223,8 @@ public class HashIndexSpoolPlannerTest extends AbstractPlannerTest {
         List<RexNode> searchRow = idxSpool.searchRow();
 
         assertNotNull(searchRow);
-        assertEquals(3, searchRow.size());
+        assertEquals(1, searchRow.size());
 
-        assertTrue(((RexLiteral) searchRow.get(0)).isNull());
-        assertTrue(((RexLiteral) searchRow.get(2)).isNull());
-        assertTrue(searchRow.get(1) instanceof RexFieldAccess);
+        assertTrue(searchRow.get(0) instanceof RexFieldAccess);
     }
 }
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
index 621b44c..ebd9a98 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
@@ -98,7 +98,7 @@ public class PlannerTest extends AbstractPlannerTest {
                         .build()) {
             @Override
             public IgniteIndex getIndex(String idxName) {
-                return new IgniteIndex(null, null, null);
+                return new IgniteIndex(null, (String) null, null);
             }
 
             @Override
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
index bbb9e9d..9e6ce34 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
@@ -26,7 +26,6 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexFieldAccess;
-import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
@@ -98,23 +97,23 @@ public class SortedIndexSpoolPlannerTest extends AbstractPlannerTest {
 
         IgniteSortedIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteSortedIndexSpool.class));
 
+        assertNotNull(idxSpool, "Invalid plan: " + RelOptUtil.toString(phys));
+
+        assertTrue(idxSpool.indexCondition().keys().contains(1));
+
         List<RexNode> lowerBound = idxSpool.indexCondition().lowerBound();
 
         assertNotNull(lowerBound);
-        assertEquals(3, lowerBound.size());
+        assertEquals(1, lowerBound.size());
 
-        assertTrue(((RexLiteral) lowerBound.get(0)).isNull());
-        assertTrue(((RexLiteral) lowerBound.get(2)).isNull());
-        assertTrue(lowerBound.get(1) instanceof RexFieldAccess);
+        assertTrue(lowerBound.get(0) instanceof RexFieldAccess);
 
         List<RexNode> upperBound = idxSpool.indexCondition().upperBound();
 
         assertNotNull(upperBound);
-        assertEquals(3, upperBound.size());
+        assertEquals(1, upperBound.size());
 
-        assertTrue(((RexLiteral) upperBound.get(0)).isNull());
-        assertTrue(((RexLiteral) upperBound.get(2)).isNull());
-        assertTrue(upperBound.get(1) instanceof RexFieldAccess);
+        assertTrue(upperBound.get(0) instanceof RexFieldAccess);
     }
 
     /**
@@ -175,24 +174,20 @@ public class SortedIndexSpoolPlannerTest extends AbstractPlannerTest {
 
         IgniteSortedIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteSortedIndexSpool.class));
 
+        assertTrue(idxSpool.indexCondition().keys().contains(1));
+
         List<RexNode> lowerBound = idxSpool.indexCondition().lowerBound();
 
         assertNotNull(lowerBound);
-        assertEquals(4, lowerBound.size());
+        assertEquals(1, lowerBound.size());
 
-        assertTrue(((RexLiteral) lowerBound.get(0)).isNull());
-        assertTrue(((RexLiteral) lowerBound.get(2)).isNull());
-        assertTrue(((RexLiteral) lowerBound.get(3)).isNull());
-        assertTrue(lowerBound.get(1) instanceof RexFieldAccess);
+        assertTrue(lowerBound.get(0) instanceof RexFieldAccess);
 
         List<RexNode> upperBound = idxSpool.indexCondition().upperBound();
 
         assertNotNull(upperBound);
-        assertEquals(4, upperBound.size());
+        assertEquals(1, upperBound.size());
 
-        assertTrue(((RexLiteral) upperBound.get(0)).isNull());
-        assertTrue(((RexLiteral) lowerBound.get(2)).isNull());
-        assertTrue(((RexLiteral) lowerBound.get(3)).isNull());
-        assertTrue(upperBound.get(1) instanceof RexFieldAccess);
+        assertTrue(upperBound.get(0) instanceof RexFieldAccess);
     }
 }
diff --git a/modules/storage-api/pom.xml b/modules/storage-api/pom.xml
index 2543ac8..2bb717b 100644
--- a/modules/storage-api/pom.xml
+++ b/modules/storage-api/pom.xml
@@ -43,6 +43,11 @@
             <artifactId>ignite-schema</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-index-api</artifactId>
+        </dependency>
+
         <!-- Test dependencies -->
         <dependency>
             <groupId>org.apache.ignite</groupId>
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageRowListener.java
similarity index 51%
copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageRowListener.java
index cda9d87..6ae62ba 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageRowListener.java
@@ -15,40 +15,39 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.schema;
+package org.apache.ignite.internal.storage.engine;
 
-import org.apache.calcite.rel.RelCollation;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Ignite scannable index.
+ * Listen storage changes.
  */
-public class IgniteIndex {
-    private final RelCollation collation;
-
-    private final String idxName;
-
-    //    private final GridIndex<H2Row> idx;
-    private final InternalIgniteTable tbl;
+public interface StorageRowListener {
+    StorageRowListener NO_OP = new StorageRowListener() {
+        @Override
+        public void onUpdate(@Nullable BinaryRow oldRow, BinaryRow newRow) {
+            // No-op.
+        }
+
+        @Override
+        public void onRemove(BinaryRow row) {
+            // No-op.
+        }
+    };
 
     /**
-     * Constructor.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Called when row is updated.
+     *
+     * @param oldRow Old row.
+     * @param newRow New row.
      */
-    public IgniteIndex(RelCollation collation, String name, InternalIgniteTable tbl) {
-        this.collation = collation;
-        idxName = name;
-        this.tbl = tbl;
-    }
-
-    public RelCollation collation() {
-        return collation;
-    }
+    void onUpdate(@Nullable BinaryRow oldRow, BinaryRow newRow);
 
-    public String name() {
-        return idxName;
-    }
-
-    public InternalIgniteTable table() {
-        return tbl;
-    }
+    /**
+     * Called when row is removed.
+     *
+     * @param row Removed row.
+     */
+    void onRemove(BinaryRow row);
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
index 549c34e..43bdf33 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.storage.engine;
 
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.internal.idx.SortedIndexDescriptor;
 import org.apache.ignite.internal.storage.PartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
@@ -62,12 +63,12 @@ public interface TableStorage {
      * <p>A prerequisite for calling this method is to have the index already configured under the same name in the Table Configuration
      * (see {@link #configuration()}).
      *
-     * @param indexName Index name.
+     * @param desc Index descriptor.
      * @return Sorted Index storage.
      * @throws StorageException if no index has been configured under the given name or it has been configured incorrectly (e.g. it was
      *                          configured as a Hash Index).
      */
-    SortedIndexStorage getOrCreateSortedIndex(String indexName);
+    SortedIndexStorage createSortedIndex(SortedIndexDescriptor desc);
 
     /**
      * Destroys the index under the given name and all data in it.
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRow.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRow.java
index 5213d9c..cb158cc 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRow.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRow.java
@@ -17,26 +17,34 @@
 
 package org.apache.ignite.internal.storage.index;
 
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.PartitionStorage;
-import org.apache.ignite.internal.storage.SearchRow;
 
 /**
  * Represents an Index Row - a set of indexed columns and Primary Key columns (for key uniqueness).
  */
 public interface IndexRow {
     /**
-     * Returns the serialized presentation of this row as a byte array.
+     * Returns the Primary Key that is a part of this row.
+     *
+     * <p>This is a convenience method for easier extraction of the Primary Key to use it for accessing the {@link PartitionStorage}.
      *
-     * @return Serialized byte array value.
+     * @return Primary key of the associated {@link PartitionStorage}.
      */
-    byte[] rowBytes();
+    BinaryRow primaryKey();
 
     /**
-     * Returns the Primary Key that is a part of this row.
+     * Returns the partition number corresponds to PK.
      *
-     * <p>This is a convenience method for easier extraction of the Primary Key to use it for accessing the {@link PartitionStorage}.
+     * @return Partition number corresponds to PK.
+     */
+    int partition();
+
+    /**
+     * Returns value of the indexed columns specified by index column order.
      *
-     * @return Primary key of the associated {@link PartitionStorage}.
+     * @param idxColOrder Index column order.
+     * @return Value of the indexed columns.
      */
-    SearchRow primaryKey();
+    Object value(int idxColOrder);
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowPrefix.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowPrefix.java
index 602cfaf..c041bf3 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowPrefix.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowPrefix.java
@@ -18,12 +18,21 @@
 package org.apache.ignite.internal.storage.index;
 
 /**
- * Represents an Index row prefix, used to perform prefix scans over the Sorted Index storage.
+ * Represents an Index Row - a set of indexed columns and Primary Key columns (for key uniqueness).
  */
 public interface IndexRowPrefix {
     /**
-     * Returns a list of column values that comprise a prefix of an Index row. Values will be sorted in the same order as the
-     * Sorted Index columns, specified by {@link SortedIndexDescriptor#indexRowColumns()}.
+     * Returns value of the indexed columns specified by index column order.
+     *
+     * @param idxColOrder Index column order.
+     * @return Value of the indexed columns.
      */
-    Object[] prefixColumnValues();
+    Object value(int idxColOrder);
+
+    /**
+     * Returns prefix length.
+     *
+     * @return prefix length.
+     */
+    int length();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexSchemaDescriptor.java
similarity index 57%
copy from modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexSchemaDescriptor.java
index 9d7ca85..bc4763e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexSchemaDescriptor.java
@@ -15,12 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.storage.index;
+
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
 
 /**
- * The event cas which is produced by event producer component.
- *
- * @see Producer#fireEvent(Event, EventParameters, Throwable)
+ * Index column collation: direction and NULLs order that a column's value is ordered in.
  */
-public interface Event {
+public class IndexSchemaDescriptor extends SchemaDescriptor {
+    private static final Column[] EMPTY_COLUMNS_ARRAY = new Column[0];
+
+    public IndexSchemaDescriptor(Column[] cols) {
+        super(0, cols, EMPTY_COLUMNS_ARRAY);
+    }
+
+    /**
+     * Returns index columns.
+     */
+    public Column[] columns() {
+        return keyColumns().columns();
+    }
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
deleted file mode 100644
index ac25744..0000000
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.index;
-
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toUnmodifiableList;
-
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.stream.Stream;
-import org.apache.ignite.configuration.NamedListView;
-import org.apache.ignite.configuration.schemas.table.ColumnView;
-import org.apache.ignite.configuration.schemas.table.IndexColumnView;
-import org.apache.ignite.configuration.schemas.table.SortedIndexView;
-import org.apache.ignite.configuration.schemas.table.TableIndexView;
-import org.apache.ignite.configuration.schemas.table.TableView;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
-import org.apache.ignite.internal.schema.configuration.SchemaDescriptorConverter;
-import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.schema.definition.ColumnDefinition;
-
-/**
- * Descriptor for creating a Sorted Index Storage.
- *
- * @see SortedIndexStorage
- */
-public class SortedIndexDescriptor {
-    /**
-     * Descriptor of a Sorted Index column (column name and column sort order).
-     */
-    public static class ColumnDescriptor {
-        private final Column column;
-
-        private final boolean asc;
-
-        private final boolean indexedColumn;
-
-        private final boolean nullable;
-
-        ColumnDescriptor(Column column, boolean asc, boolean indexedColumn, boolean nullable) {
-            this.column = column;
-            this.asc = asc;
-            this.indexedColumn = indexedColumn;
-            this.nullable = nullable;
-        }
-
-        /**
-         * Returns a column descriptor.
-         */
-        public Column column() {
-            return column;
-        }
-
-        /**
-         * Returns {@code true} if this column is sorted in ascending order or {@code false} otherwise.
-         */
-        public boolean asc() {
-            return asc;
-        }
-
-        /**
-         * Returns {@code true} if this column was explicitly marked as an indexed column or {@code false} if it is a part of a Primary Key
-         * appended for uniqueness.
-         */
-        public boolean indexedColumn() {
-            return indexedColumn;
-        }
-
-        /**
-         * Returns {@code true} if this column can contain null values or {@code false} otherwise.
-         */
-        public boolean nullable() {
-            return nullable;
-        }
-
-        @Override
-        public String toString() {
-            return S.toString(this);
-        }
-    }
-
-    private final String name;
-
-    private final List<ColumnDescriptor> columns;
-
-    private final SchemaDescriptor schemaDescriptor;
-
-    /**
-     * Creates an Index Descriptor from a given Table Configuration.
-     *
-     * @param name        index name.
-     * @param tableConfig table configuration.
-     */
-    public SortedIndexDescriptor(String name, TableView tableConfig) {
-        this.name = name;
-
-        TableIndexView indexConfig = tableConfig.indices().get(name);
-
-        if (indexConfig == null) {
-            throw new StorageException(String.format("Index configuration for \"%s\" could not be found", name));
-        }
-
-        if (!(indexConfig instanceof SortedIndexView)) {
-            throw new StorageException(String.format(
-                    "Index \"%s\" is not configured as a Sorted Index. Actual type: %s",
-                    name, indexConfig.type()
-            ));
-        }
-
-        // extract indexed column configurations from the table configuration
-        NamedListView<? extends IndexColumnView> indexColumns = ((SortedIndexView) indexConfig).columns();
-
-        Stream<String> indexColumnNames = indexColumns.namedListKeys().stream();
-
-        // append the primary key to guarantee index key uniqueness
-        Stream<String> primaryKeyColumnNames = Arrays.stream(tableConfig.primaryKey().columns());
-
-        List<ColumnView> indexKeyColumnViews = Stream.concat(indexColumnNames, primaryKeyColumnNames)
-                .distinct() // remove Primary Key columns if they are already present in the index definition
-                .map(columnName -> {
-                    ColumnView columnView = tableConfig.columns().get(columnName);
-
-                    assert columnView != null : "Incorrect index column configuration. " + columnName + " column does not exist";
-
-                    return columnView;
-                })
-                .collect(toList());
-
-        schemaDescriptor = createSchemaDescriptor(indexKeyColumnViews);
-
-        columns = Arrays.stream(schemaDescriptor.keyColumns().columns())
-                .sorted(Comparator.comparingInt(Column::columnOrder))
-                .map(column -> {
-                    IndexColumnView columnView = indexColumns.get(column.name());
-
-                    // if the index config does not contain this column - it's a column from the Primary Key
-                    boolean indexedColumn = columnView != null;
-
-                    // PK columns are always sorted in ascending order
-                    boolean asc = !indexedColumn || columnView.asc();
-
-                    return new ColumnDescriptor(column, asc, indexedColumn, column.nullable());
-                })
-                .collect(toUnmodifiableList());
-    }
-
-    /**
-     * Creates a {@link SchemaDescriptor} from a list of index key columns.
-     */
-    private static SchemaDescriptor createSchemaDescriptor(List<ColumnView> indexKeyColumnViews) {
-        Column[] keyColumns = new Column[indexKeyColumnViews.size()];
-
-        for (int i = 0; i < indexKeyColumnViews.size(); ++i) {
-            ColumnView columnView = indexKeyColumnViews.get(i);
-
-            ColumnDefinition columnDefinition = SchemaConfigurationConverter.convert(columnView);
-
-            keyColumns[i] = SchemaDescriptorConverter.convert(i, columnDefinition);
-        }
-
-        return new SchemaDescriptor(0, keyColumns, new Column[0]);
-    }
-
-    /**
-     * Returns this index' name.
-     */
-    public String name() {
-        return name;
-    }
-
-    /**
-     * Returns the Column Descriptors that comprise a row of this index (indexed columns + primary key columns).
-     */
-    public List<ColumnDescriptor> indexRowColumns() {
-        return columns;
-    }
-
-    /**
-     * Converts this Descriptor into an equivalent {@link SchemaDescriptor}.
-     *
-     * <p>The resulting {@code SchemaDescriptor} will have empty {@link SchemaDescriptor#valueColumns()} and its
-     * {@link SchemaDescriptor#keyColumns()} will be consistent with the columns returned by {@link #indexRowColumns()}.
-     */
-    public SchemaDescriptor asSchemaDescriptor() {
-        return schemaDescriptor;
-    }
-}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
index bcbffe2..bbbe9a8 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.storage.index;
 
+import java.util.function.Predicate;
+import org.apache.ignite.internal.idx.SortedIndexDescriptor;
 import org.apache.ignite.internal.storage.SearchRow;
 import org.apache.ignite.internal.util.Cursor;
 
@@ -35,16 +37,6 @@ public interface SortedIndexStorage extends AutoCloseable {
     SortedIndexDescriptor indexDescriptor();
 
     /**
-     * Returns a factory for creating index rows for this storage.
-     */
-    IndexRowFactory indexRowFactory();
-
-    /**
-     * Returns a class deserializing index columns.
-     */
-    IndexRowDeserializer indexRowDeserializer();
-
-    /**
      * Adds the given index key and {@link SearchRow} to the index.
      *
      * <p>Putting a new value under the same key will overwrite the previous associated value.
@@ -60,9 +52,12 @@ public interface SortedIndexStorage extends AutoCloseable {
 
     /**
      * Returns a range of index values between the lower bound (inclusive) and the upper bound (inclusive).
+     *
+     * @param low Lower bound of the range to scan.
+     * @param up Upper bound of the range to scan.
+     * @param filter Row filter. Basically it is used to filter by partition number.
      */
-    // TODO: add options https://issues.apache.org/jira/browse/IGNITE-16059
-    Cursor<IndexRow> range(IndexRowPrefix lowerBound, IndexRowPrefix upperBound);
+    Cursor<IndexRow> range(IndexRowPrefix low, IndexRowPrefix up, Predicate<IndexRow> filter);
 
     /**
      * Removes all data in this index and frees the associated resources.
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 094e416..bc33a2a 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.storage.rocksdb;
 import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.PARTITION_CF_NAME;
 import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.columnFamilyType;
 import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
-import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexName;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -35,15 +34,16 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 import java.util.stream.Collectors;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.internal.idx.SortedIndexDescriptor;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.storage.PartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.engine.DataRegion;
 import org.apache.ignite.internal.storage.engine.TableStorage;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.rocksdb.index.BinaryRowComparator;
 import org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
+import org.apache.ignite.internal.storage.rocksdb.index.SortedIndexStorageDescriptor;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.jetbrains.annotations.NotNull;
@@ -172,11 +172,10 @@ class RocksDbTableStorage implements TableStorage {
                     break;
 
                 case SORTED_INDEX:
-                    String indexName = sortedIndexName(handleName);
-
-                    var indexDescriptor = new SortedIndexDescriptor(indexName, tableCfg.value());
-
-                    sortedIndices.put(indexName, new RocksDbSortedIndexStorage(cf, indexDescriptor));
+                    // TODO: restore index on restart.
+                    //String indexName = sortedIndexName(handleName);
+                    //var indexDescriptor = new MySortedIndexDescriptor(indexName, tableCfg.value());
+                    //sortedIndices.put(indexName, new RocksDbSortedIndexStorage(cf, indexDescriptor));
 
                     break;
 
@@ -289,17 +288,16 @@ class RocksDbTableStorage implements TableStorage {
     }
 
     @Override
-    public SortedIndexStorage getOrCreateSortedIndex(String indexName) {
+    public SortedIndexStorage createSortedIndex(SortedIndexDescriptor desc) {
         assert !stopped : "Storage has been stopped";
 
-        return sortedIndices.computeIfAbsent(indexName, name -> {
-            var indexDescriptor = new SortedIndexDescriptor(name, tableCfg.value());
-
-            ColumnFamilyDescriptor cfDescriptor = sortedIndexCfDescriptor(indexDescriptor);
+        SortedIndexStorageDescriptor idxStoreDesc = new SortedIndexStorageDescriptor(desc);
+        return sortedIndices.computeIfAbsent(desc.name(), name -> {
+            ColumnFamilyDescriptor cfDescriptor = sortedIndexCfDescriptor(idxStoreDesc);
 
             ColumnFamily cf = createColumnFamily(sortedIndexCfName(name), cfDescriptor);
 
-            return new RocksDbSortedIndexStorage(cf, indexDescriptor);
+            return new RocksDbSortedIndexStorage(cf, idxStoreDesc);
         });
     }
 
@@ -392,9 +390,13 @@ class RocksDbTableStorage implements TableStorage {
                 return new ColumnFamilyDescriptor(cfName.getBytes(StandardCharsets.UTF_8), new ColumnFamilyOptions());
 
             case SORTED_INDEX:
-                var indexDescriptor = new SortedIndexDescriptor(sortedIndexName(cfName), tableCfg.value());
+                // TODO: resore index on restart
+                // var indexDescriptor = new SortedIndexDescriptor(sortedIndexName(cfName), tableCfg.value());
+                // return sortedIndexCfDescriptor(indexDescriptor);
+
+                assert false : "Restore index not supported";
 
-                return sortedIndexCfDescriptor(indexDescriptor);
+                return null;
 
             default:
                 throw new StorageException("Unidentified column family [name=" + cfName + ", table=" + tableCfg.name() + ']');
@@ -411,7 +413,7 @@ class RocksDbTableStorage implements TableStorage {
     /**
      * Creates a Column Family descriptor for a Sorted Index.
      */
-    private static ColumnFamilyDescriptor sortedIndexCfDescriptor(SortedIndexDescriptor descriptor) {
+    private static ColumnFamilyDescriptor sortedIndexCfDescriptor(SortedIndexStorageDescriptor descriptor) {
         String cfName = sortedIndexCfName(descriptor.name());
 
         ColumnFamilyOptions options = new ColumnFamilyOptions().setComparator(new BinaryRowComparator(descriptor));
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowDeserializer.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowDeserializer.java
deleted file mode 100644
index 9cba543..0000000
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowDeserializer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.rocksdb.index;
-
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.internal.schema.row.Row;
-import org.apache.ignite.internal.storage.index.IndexRow;
-import org.apache.ignite.internal.storage.index.IndexRowDeserializer;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
-
-/**
- * {@link IndexRowDeserializer} implementation that uses {@link BinaryRow} infrastructure for deserialization purposes.
- */
-class BinaryIndexRowDeserializer implements IndexRowDeserializer {
-    private final SortedIndexDescriptor descriptor;
-
-    BinaryIndexRowDeserializer(SortedIndexDescriptor descriptor) {
-        this.descriptor = descriptor;
-    }
-
-    @Override
-    public Object[] indexedColumnValues(IndexRow indexRow) {
-        var row = new Row(descriptor.asSchemaDescriptor(), new ByteBufferRow(indexRow.rowBytes()));
-
-        return descriptor.indexRowColumns().stream()
-                .filter(ColumnDescriptor::indexedColumn)
-                .map(ColumnDescriptor::column)
-                .map(column -> column.type().spec().objectValue(row, column.schemaIndex()))
-                .toArray();
-    }
-}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowFactory.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowFactory.java
deleted file mode 100644
index d8a09d6..0000000
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowFactory.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.rocksdb.index;
-
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.row.RowAssembler;
-import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.index.IndexRow;
-import org.apache.ignite.internal.storage.index.IndexRowFactory;
-import org.apache.ignite.internal.storage.index.IndexRowPrefix;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
-
-/**
- * {@link IndexRowFactory} implementation that uses {@link BinaryRow} as the index keys serialization mechanism.
- */
-class BinaryIndexRowFactory implements IndexRowFactory {
-    private final SortedIndexDescriptor descriptor;
-
-    BinaryIndexRowFactory(SortedIndexDescriptor descriptor) {
-        this.descriptor = descriptor;
-    }
-
-    @Override
-    public IndexRow createIndexRow(Object[] columnValues, SearchRow primaryKey) {
-        if (columnValues.length != descriptor.indexRowColumns().size()) {
-            throw new IllegalArgumentException(String.format(
-                    "Incorrect number of column values passed. Expected %d, got %d",
-                    descriptor.indexRowColumns().size(),
-                    columnValues.length
-            ));
-        }
-
-        RowAssembler rowAssembler = createRowAssembler(columnValues);
-
-        for (Column column : descriptor.asSchemaDescriptor().keyColumns().columns()) {
-            Object columnValue = columnValues[column.columnOrder()];
-
-            RowAssembler.writeValue(rowAssembler, column, columnValue);
-        }
-
-        return new BinaryIndexRow(rowAssembler.build(), primaryKey);
-    }
-
-    @Override
-    public IndexRowPrefix createIndexRowPrefix(Object[] prefixColumnValues) {
-        if (prefixColumnValues.length > descriptor.indexRowColumns().size()) {
-            throw new IllegalArgumentException(String.format(
-                    "Incorrect number of column values passed. Expected not more than %d, got %d",
-                    descriptor.indexRowColumns().size(),
-                    prefixColumnValues.length
-            ));
-        }
-
-        return () -> prefixColumnValues;
-    }
-
-    /**
-     * Creates a {@link RowAssembler} that can later be used to serialized the given column mapping.
-     */
-    private RowAssembler createRowAssembler(Object[] rowColumns) {
-        SchemaDescriptor schemaDescriptor = descriptor.asSchemaDescriptor();
-
-        int nonNullVarlenKeyCols = 0;
-
-        for (Column column : schemaDescriptor.keyColumns().columns()) {
-            Object columnValue = rowColumns[column.columnOrder()];
-
-            if (!column.type().spec().fixedLength() && columnValue != null) {
-                nonNullVarlenKeyCols += 1;
-            }
-        }
-
-        return new RowAssembler(schemaDescriptor, nonNullVarlenKeyCols, 0);
-    }
-}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowSerializer.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowSerializer.java
new file mode 100644
index 0000000..437faf3
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowSerializer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.storage.index.IndexRow;
+
+/**
+ * {@link IndexRowDeserializer} implementation that uses {@link BinaryRow} infrastructure for deserialization purposes.
+ */
+class BinaryIndexRowSerializer implements IndexRowSerializer, IndexRowDeserializer {
+    private final SortedIndexStorageDescriptor descriptor;
+
+    BinaryIndexRowSerializer(SortedIndexStorageDescriptor descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    @Override
+    public IndexRow deserialize(IndexBinaryRow binRow) {
+        return new IndexRowImpl(binRow.keySlice(), binRow.valueSlice(), descriptor);
+    }
+
+    @Override
+    public IndexBinaryRow serialize(IndexRow row) {
+        RowAssembler rowAssembler = createRowAssembler(row);
+
+        for (Column column : descriptor.schema().keyColumns().columns()) {
+            Object columnValue = row.value(column.columnOrder());
+
+            RowAssembler.writeValue(rowAssembler, column, columnValue);
+        }
+
+        return new IndexBinaryRowImpl(rowAssembler.build(), row.primaryKey());
+    }
+
+    /**
+     * Creates a {@link RowAssembler} that can later be used to serialized the given column mapping.
+     */
+    private RowAssembler createRowAssembler(IndexRow row) {
+        SchemaDescriptor schemaDescriptor = descriptor.schema();
+
+        int nonNullVarlenKeyCols = 0;
+
+        for (Column column : schemaDescriptor.keyColumns().columns()) {
+            Object columnValue = row.value(column.columnOrder());
+
+            if (!column.type().spec().fixedLength() && columnValue != null) {
+                nonNullVarlenKeyCols += 1;
+            }
+        }
+
+        return new RowAssembler(schemaDescriptor, nonNullVarlenKeyCols, 0);
+    }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryRowComparator.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryRowComparator.java
index ad47a27..8c526f2 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryRowComparator.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryRowComparator.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypeSpec;
 import org.apache.ignite.internal.schema.row.Row;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
 import org.rocksdb.AbstractComparator;
 import org.rocksdb.ComparatorOptions;
 
@@ -53,14 +52,14 @@ public class BinaryRowComparator extends AbstractComparator {
     /**
      * Creates a RocksDB comparator for a Sorted Index identified by the given descriptor.
      */
-    public BinaryRowComparator(SortedIndexDescriptor descriptor) {
+    public BinaryRowComparator(SortedIndexStorageDescriptor descriptor) {
         this(descriptor, new ComparatorOptions());
     }
 
     /**
      * Internal constructor for capturing the {@code options} parameter for resource management purposes.
      */
-    private BinaryRowComparator(SortedIndexDescriptor descriptor, ComparatorOptions options) {
+    private BinaryRowComparator(SortedIndexStorageDescriptor descriptor, ComparatorOptions options) {
         super(options);
 
         innerComparator = comparing(
@@ -74,9 +73,9 @@ public class BinaryRowComparator extends AbstractComparator {
     /**
      * Creates a comparator for comparing two {@link BinaryRow}s by converting them into {@link Row}s.
      */
-    private static Comparator<BinaryRow> binaryRowComparator(SortedIndexDescriptor descriptor) {
+    private static Comparator<BinaryRow> binaryRowComparator(SortedIndexStorageDescriptor descriptor) {
         return comparing(
-                binaryRow -> new Row(descriptor.asSchemaDescriptor(), binaryRow),
+                binaryRow -> new Row(descriptor.schema(), binaryRow),
                 rowComparator(descriptor)
         );
     }
@@ -84,8 +83,8 @@ public class BinaryRowComparator extends AbstractComparator {
     /**
      * Creates a comparator that compares two {@link Row}s by comparing individual columns.
      */
-    private static Comparator<Row> rowComparator(SortedIndexDescriptor descriptor) {
-        return descriptor.indexRowColumns().stream()
+    private static Comparator<Row> rowComparator(SortedIndexStorageDescriptor descriptor) {
+        return descriptor.indexColumns().stream()
                 .map(columnDescriptor -> {
                     Column column = columnDescriptor.column();
 
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexBinaryRow.java
similarity index 68%
copy from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java
copy to modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexBinaryRow.java
index cf860d6..80d5aac 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexBinaryRow.java
@@ -15,17 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.storage.index;
+package org.apache.ignite.internal.storage.rocksdb.index;
 
 /**
- * Class for extracting indexed column values from an {@link IndexRow}.
+ * Represents an Index Row - a set of indexed columns and Primary Key columns (for key uniqueness).
  */
-public interface IndexRowDeserializer {
+public interface IndexBinaryRow {
     /**
-     * De-serializes column values that were used to create the index.
-     *
-     * @param indexRow Index row.
-     * @return Values of the indexed columns.
+     * Get ByteBuffer slice representing the key chunk.
      */
-    Object[] indexedColumnValues(IndexRow indexRow);
+    byte[] keySlice();
+
+    /**
+     * Get ByteBuffer slice representing the value chunk.
+     */
+    byte[] valueSlice();
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRow.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexBinaryRowImpl.java
similarity index 68%
rename from modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRow.java
rename to modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexBinaryRowImpl.java
index 9f1531b..d902810 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRow.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexBinaryRowImpl.java
@@ -19,37 +19,33 @@ package org.apache.ignite.internal.storage.rocksdb.index;
 
 import java.util.Arrays;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
 
 /**
- * {@link IndexRow} implementation that uses {@link BinaryRow} serialization.
+ * Implementation of the {@link IndexBinaryRow}.
  */
-class BinaryIndexRow implements IndexRow {
+class IndexBinaryRowImpl implements IndexBinaryRow {
     private final byte[] bytes;
 
-    private final SearchRow pk;
+    private final BinaryRow pk;
 
-    BinaryIndexRow(byte[] bytes, byte[] pkBytes) {
+    /**
+     * Constructor.
+     */
+    IndexBinaryRowImpl(byte[] bytes, byte[] pkBytes) {
         this.bytes = bytes;
-        this.pk = new ByteArraySearchRow(pkBytes);
+        this.pk = new ByteBufferRow(pkBytes);
     }
 
-    BinaryIndexRow(BinaryRow row, SearchRow primaryKey) {
+    /**
+     * Constructor.
+     */
+    IndexBinaryRowImpl(BinaryRow row, BinaryRow pk) {
         this.bytes = row.bytes();
-        this.pk = primaryKey;
-    }
-
-    @Override
-    public byte[] rowBytes() {
-        return bytes;
-    }
-
-    @Override
-    public SearchRow primaryKey() {
-        return pk;
+        this.pk = pk;
     }
 
+    /** {@inheritDoc} */
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -58,12 +54,25 @@ class BinaryIndexRow implements IndexRow {
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        BinaryIndexRow that = (BinaryIndexRow) o;
+        IndexBinaryRowImpl that = (IndexBinaryRowImpl) o;
         return Arrays.equals(bytes, that.bytes);
     }
 
+    /** {@inheritDoc} */
     @Override
     public int hashCode() {
         return Arrays.hashCode(bytes);
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public byte[] keySlice() {
+        return bytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public byte[] valueSlice() {
+        return pk.bytes();
+    }
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowDeserializer.java
similarity index 85%
copy from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java
copy to modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowDeserializer.java
index cf860d6..afbbe9f 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowDeserializer.java
@@ -15,10 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.storage.index;
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import org.apache.ignite.internal.storage.index.IndexRow;
 
 /**
- * Class for extracting indexed column values from an {@link IndexRow}.
+ * Class for extracting indexed column values from an {@link IndexBinaryRow}.
  */
 public interface IndexRowDeserializer {
     /**
@@ -27,5 +29,5 @@ public interface IndexRowDeserializer {
      * @param indexRow Index row.
      * @return Values of the indexed columns.
      */
-    Object[] indexedColumnValues(IndexRow indexRow);
+    IndexRow deserialize(IndexBinaryRow indexRow);
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowImpl.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowImpl.java
new file mode 100644
index 0000000..0ed0d8e
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowImpl.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.storage.index.IndexRow;
+
+/**
+ * {@link IndexRow} implementation.
+ */
+class IndexRowImpl implements IndexRow {
+    private final Row keyRow;
+
+    private final byte[] valBytes;
+
+    private final SortedIndexStorageDescriptor desc;
+
+    /**
+     * Creates index row by index data schema over the bytes.
+     *
+     * @param desc   Index descriptor.
+     */
+    IndexRowImpl(byte[] keyBytes, byte[] valBytes, SortedIndexStorageDescriptor desc) {
+        this.desc = desc;
+
+        keyRow = new Row(desc.schema(), new ByteBufferRow(keyBytes));
+        this.valBytes = valBytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Object value(int idxColOrder) {
+        Column c = desc.indexColumns().get(idxColOrder).column();
+
+        switch (c.type().spec()) {
+            case INT8:
+                return keyRow.byteValueBoxed(c.schemaIndex());
+
+            case INT16:
+                return keyRow.shortValueBoxed(c.schemaIndex());
+
+            case INT32:
+                return keyRow.intValueBoxed(c.schemaIndex());
+
+            case INT64:
+                return keyRow.longValueBoxed(c.schemaIndex());
+
+            case FLOAT:
+                return keyRow.floatValueBoxed(c.schemaIndex());
+
+            case DOUBLE:
+                return keyRow.doubleValueBoxed(c.schemaIndex());
+
+            case DECIMAL:
+                return keyRow.decimalValue(c.schemaIndex());
+
+            case UUID:
+                return keyRow.uuidValue(c.schemaIndex());
+
+            case STRING:
+                return keyRow.stringValue(c.schemaIndex());
+
+            case BYTES:
+                return keyRow.bytesValue(c.schemaIndex());
+
+            case BITMASK:
+                return keyRow.bitmaskValue(c.schemaIndex());
+
+            case NUMBER:
+                return keyRow.numberValue(c.schemaIndex());
+
+            case DATE:
+                return keyRow.dateValue(c.schemaIndex());
+
+            case TIME:
+                return keyRow.timeValue(c.schemaIndex());
+
+            case DATETIME:
+                return keyRow.dateTimeValue(c.schemaIndex());
+
+            case TIMESTAMP:
+                return keyRow.timestampValue(c.schemaIndex());
+
+            default:
+                throw new IllegalStateException("Unexpected value: " + c.type().spec());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public BinaryRow primaryKey() {
+        return new ByteBufferRow(valBytes);
+    }
+
+    @Override
+    public int partition() {
+        return 0;
+    }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowSerializer.java
similarity index 79%
rename from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java
rename to modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowSerializer.java
index cf860d6..9d00c59 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowSerializer.java
@@ -15,17 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.storage.index;
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import org.apache.ignite.internal.storage.index.IndexRow;
 
 /**
- * Class for extracting indexed column values from an {@link IndexRow}.
+ * Class for extracting indexed column values from an {@link IndexBinaryRow}.
  */
-public interface IndexRowDeserializer {
+public interface IndexRowSerializer {
     /**
-     * De-serializes column values that were used to create the index.
+     * Serializes index row.
      *
      * @param indexRow Index row.
      * @return Values of the indexed columns.
      */
-    Object[] indexedColumnValues(IndexRow indexRow);
+    IndexBinaryRow serialize(IndexRow indexRow);
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/PrefixComparator.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/PrefixComparator.java
index 2001d77..414fdfb 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/PrefixComparator.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/PrefixComparator.java
@@ -19,21 +19,20 @@ package org.apache.ignite.internal.storage.rocksdb.index;
 
 import java.util.Arrays;
 import java.util.BitSet;
+import org.apache.ignite.internal.idx.SortedIndexColumnDescriptor;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypeSpec;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.storage.index.IndexRowPrefix;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Class for comparing a {@link BinaryRow} representing an Index Key with a given prefix of index columns.
  */
 class PrefixComparator {
-    private final SortedIndexDescriptor descriptor;
-    private final @Nullable Object[] prefix;
+    private final SortedIndexStorageDescriptor descriptor;
+    private final @Nullable IndexRowPrefix prefix;
 
     /**
      * Creates a new prefix comparator.
@@ -41,11 +40,11 @@ class PrefixComparator {
      * @param descriptor Index Descriptor of the enclosing index.
      * @param prefix Prefix to compare the incoming rows against.
      */
-    PrefixComparator(SortedIndexDescriptor descriptor, IndexRowPrefix prefix) {
-        assert descriptor.indexRowColumns().size() >= prefix.prefixColumnValues().length;
+    PrefixComparator(SortedIndexStorageDescriptor descriptor, IndexRowPrefix prefix) {
+        assert descriptor.columns().size() >= prefix.length();
 
         this.descriptor = descriptor;
-        this.prefix = prefix.prefixColumnValues();
+        this.prefix = prefix;
     }
 
     /**
@@ -57,12 +56,12 @@ class PrefixComparator {
      *         a value greater than {@code 0} if the row's prefix is larger than the prefix.
      */
     int compare(BinaryRow binaryRow) {
-        var row = new Row(descriptor.asSchemaDescriptor(), binaryRow);
+        var row = new Row(descriptor.schema(), binaryRow);
 
-        for (int i = 0; i < prefix.length; ++i) {
-            ColumnDescriptor columnDescriptor = descriptor.indexRowColumns().get(i);
+        for (int i = 0; i < prefix.length(); ++i) {
+            SortedIndexColumnDescriptor columnDescriptor = descriptor.indexColumns().get(i);
 
-            int compare = compare(columnDescriptor.column(), row, prefix[i]);
+            int compare = compare(columnDescriptor.column(), row, prefix.value(i));
 
             if (compare != 0) {
                 return columnDescriptor.asc() ? compare : -compare;
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index 6d7b6e5..b55ea8e 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -17,15 +17,13 @@
 
 package org.apache.ignite.internal.storage.rocksdb.index;
 
+import java.util.function.Predicate;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.index.IndexRow;
-import org.apache.ignite.internal.storage.index.IndexRowDeserializer;
-import org.apache.ignite.internal.storage.index.IndexRowFactory;
 import org.apache.ignite.internal.storage.index.IndexRowPrefix;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
@@ -38,72 +36,67 @@ import org.rocksdb.RocksIterator;
 public class RocksDbSortedIndexStorage implements SortedIndexStorage {
     private final ColumnFamily indexCf;
 
-    private final SortedIndexDescriptor descriptor;
-
-    private final IndexRowFactory indexRowFactory;
+    private final SortedIndexStorageDescriptor descriptor;
 
     private final IndexRowDeserializer indexRowDeserializer;
 
+    private final IndexRowSerializer indexRowSerializer;
+
     /**
      * Creates a new Index storage.
      *
      * @param indexCf Column Family for storing the data.
      * @param descriptor Index descriptor.
      */
-    public RocksDbSortedIndexStorage(ColumnFamily indexCf, SortedIndexDescriptor descriptor) {
+    public RocksDbSortedIndexStorage(ColumnFamily indexCf, SortedIndexStorageDescriptor descriptor) {
         this.indexCf = indexCf;
         this.descriptor = descriptor;
-        this.indexRowFactory = new BinaryIndexRowFactory(descriptor);
-        this.indexRowDeserializer = new BinaryIndexRowDeserializer(descriptor);
-    }
 
-    @Override
-    public SortedIndexDescriptor indexDescriptor() {
-        return descriptor;
+        BinaryIndexRowSerializer serializer = new BinaryIndexRowSerializer(descriptor);
+        this.indexRowSerializer = serializer;
+        this.indexRowDeserializer = serializer;
     }
 
     @Override
-    public IndexRowFactory indexRowFactory() {
-        return indexRowFactory;
-    }
-
-    @Override
-    public IndexRowDeserializer indexRowDeserializer() {
-        return indexRowDeserializer;
+    public SortedIndexStorageDescriptor indexDescriptor() {
+        return descriptor;
     }
 
     @Override
     public void put(IndexRow row) {
-        assert row.rowBytes().length > 0;
-        assert row.primaryKey().keyBytes().length > 0;
+        assert row.primaryKey().bytes().length > 0;
 
         try {
-            indexCf.put(row.rowBytes(), row.primaryKey().keyBytes());
+            IndexBinaryRow binRow = indexRowSerializer.serialize(row);
+
+            indexCf.put(binRow.keySlice(), binRow.valueSlice());
         } catch (RocksDBException e) {
             throw new StorageException("Error while adding data to Rocks DB", e);
         }
     }
 
     @Override
-    public void remove(IndexRow key) {
+    public void remove(IndexRow row) {
         try {
-            indexCf.delete(key.rowBytes());
+            IndexBinaryRow binRow = indexRowSerializer.serialize(row);
+
+            indexCf.delete(binRow.keySlice());
         } catch (RocksDBException e) {
             throw new StorageException("Error while removing data from Rocks DB", e);
         }
     }
 
     @Override
-    public Cursor<IndexRow> range(IndexRowPrefix lowerBound, IndexRowPrefix upperBound) {
+    public Cursor<IndexRow> range(IndexRowPrefix low, IndexRowPrefix up, Predicate<IndexRow> filter) {
         RocksIterator iter = indexCf.newIterator();
 
         iter.seekToFirst();
 
         return new RocksIteratorAdapter<>(iter) {
             @Nullable
-            private PrefixComparator lowerBoundComparator = new PrefixComparator(descriptor, lowerBound);
+            private PrefixComparator lowerBoundComparator = low != null ? new PrefixComparator(descriptor, low) : null;
 
-            private final PrefixComparator upperBoundComparator = new PrefixComparator(descriptor, upperBound);
+            private final PrefixComparator upperBoundComparator = up != null ? new PrefixComparator(descriptor, up) : null;
 
             @Override
             public boolean hasNext() {
@@ -122,7 +115,7 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
                         }
                     }
 
-                    return upperBoundComparator.compare(row) <= 0;
+                    return upperBoundComparator == null || upperBoundComparator.compare(row) <= 0;
                 }
 
                 return false;
@@ -130,7 +123,7 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
 
             @Override
             protected IndexRow decodeEntry(byte[] key, byte[] value) {
-                return new BinaryIndexRow(key, value);
+                return indexRowDeserializer.deserialize(new IndexBinaryRowImpl(key, value));
             }
         };
     }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/SortedIndexStorageDescriptor.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/SortedIndexStorageDescriptor.java
new file mode 100644
index 0000000..35b42bd
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/SortedIndexStorageDescriptor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.idx.SortedIndexColumnDescriptor;
+import org.apache.ignite.internal.idx.SortedIndexDescriptor;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.storage.index.IndexSchemaDescriptor;
+
+/**
+ * Descriptor for creating a Sorted Index Storage.
+ * TODO: IGNITE-16105 Replace sorted index binary storage protocol
+ */
+public class SortedIndexStorageDescriptor extends SortedIndexDescriptor {
+    private final List<SortedIndexColumnDescriptor> idxColumns;
+
+    private final IndexSchemaDescriptor idxSchema;
+
+    /**
+     * Creates an Index Descriptor from a given Table Configuration.
+     */
+    public SortedIndexStorageDescriptor(SortedIndexDescriptor desc) {
+        super(desc.name(), desc.columns(), desc.pkColumns());
+
+        this.idxSchema = new IndexSchemaDescriptor(
+                IntStream.range(0, columns().size())
+                        .mapToObj(i -> columns().get(i).column().copyWithOrder(i))
+                        .toArray(Column[]::new)
+        );
+
+        idxColumns = Arrays.asList(idxSchema.columns()).stream()
+                .sorted(Comparator.comparing(Column::columnOrder))
+                .map(c -> {
+                    SortedIndexColumnDescriptor origDesc = columns().stream()
+                            .filter(origCol -> origCol.column().name().equals(c.name()))
+                            .findAny()
+                            .get();
+
+                    return new SortedIndexColumnDescriptor(c, origDesc.collation());
+                })
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Converts this Descriptor into an equivalent {@link SchemaDescriptor}.
+     * TODO: IGNITE-16105 Replace sorted index binary storage protocol
+     *
+     * <p>The resulting {@code SchemaDescriptor} will have empty {@link SchemaDescriptor#valueColumns()} and its
+     * {@link SchemaDescriptor#keyColumns()} will be consistent with the columns returned by {@link #columns()}.
+     */
+    public SchemaDescriptor schema() {
+        return idxSchema;
+    }
+
+    /**
+     * Index columns with {@link Column#schemaIndex()} specified for index storage schema.
+     * TODO: IGNITE-16105 Replace sorted index binary storage protocol
+     */
+    public List<SortedIndexColumnDescriptor> indexColumns() {
+        return idxColumns;
+    }
+}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowWrapper.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowWrapper.java
index 7539ea0..779aec1 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowWrapper.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowWrapper.java
@@ -30,12 +30,13 @@ import java.util.Objects;
 import java.util.Random;
 import java.util.function.Function;
 import java.util.stream.IntStream;
+import org.apache.ignite.internal.idx.SortedIndexColumnDescriptor;
+import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.SchemaTestUtils;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexRowPrefix;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.table.Tuple;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -45,16 +46,16 @@ class IndexRowWrapper implements Comparable<IndexRowWrapper> {
     /**
      * Values used to create the Index row.
      */
-    private final Object[] columns;
+    private final Tuple row;
 
-    private final IndexRow row;
+    private final IndexRow idxRow;
 
-    private final SortedIndexDescriptor descriptor;
+    private final SortedIndexStorageDescriptor desc;
 
-    IndexRowWrapper(SortedIndexStorage storage, IndexRow row, Object[] columns) {
-        this.descriptor = storage.indexDescriptor();
+    IndexRowWrapper(SortedIndexStorage storage, IndexRow idxRow, Tuple row) {
+        this.desc = (SortedIndexStorageDescriptor) storage.indexDescriptor();
+        this.idxRow = idxRow;
         this.row = row;
-        this.columns = columns;
     }
 
     /**
@@ -63,48 +64,49 @@ class IndexRowWrapper implements Comparable<IndexRowWrapper> {
     static IndexRowWrapper randomRow(SortedIndexStorage indexStorage) {
         var random = new Random();
 
-        Object[] columns = indexStorage.indexDescriptor().indexRowColumns().stream()
-                .map(ColumnDescriptor::column)
-                .map(column -> generateRandomValue(random, column.type()))
-                .toArray();
+        Tuple row = Tuple.create();
 
-        var primaryKey = new ByteArraySearchRow(randomBytes(random, 25));
+        ((SortedIndexStorageDescriptor) indexStorage.indexDescriptor()).indexColumns().stream()
+                .map(SortedIndexColumnDescriptor::column)
+                .forEach(column -> row.set(column.name(), generateRandomValue(random, column.type())));
 
-        IndexRow row = indexStorage.indexRowFactory().createIndexRow(columns, primaryKey);
+        var primaryKey = new ByteBufferRow(randomBytes(random, 25));
 
-        return new IndexRowWrapper(indexStorage, row, columns);
+        return new IndexRowWrapper(indexStorage, new TestIndexRow(row, primaryKey, 0), row);
     }
 
     /**
      * Creates an Index Key prefix of the given length.
      */
     IndexRowPrefix prefix(int length) {
-        return () -> Arrays.copyOf(columns, length);
-    }
-
-    IndexRow row() {
-        return row;
-    }
+        return new IndexRowPrefix() {
+            @Override
+            public Object value(int idxColOrder) {
+                return idxRow.value(idxColOrder);
+            }
 
-    Object[] columns() {
-        return columns;
+            @Override
+            public int length() {
+                return length;
+            }
+        };
     }
 
     @Override
     public int compareTo(@NotNull IndexRowWrapper o) {
-        int sizeCompare = Integer.compare(columns.length, o.columns.length);
+        int sizeCompare = Integer.compare(row.columnCount(), o.row.columnCount());
 
         if (sizeCompare != 0) {
             return sizeCompare;
         }
 
-        for (int i = 0; i < columns.length; ++i) {
-            Comparator<Object> comparator = comparator(columns[i].getClass());
+        for (int i = 0; i < row.columnCount(); ++i) {
+            Comparator<Object> comparator = comparator(row.value(i).getClass());
 
-            int compare = comparator.compare(columns[i], o.columns[i]);
+            int compare = comparator.compare(row.value(i), o.row.value(i));
 
             if (compare != 0) {
-                boolean asc = descriptor.indexRowColumns().get(i).asc();
+                boolean asc = desc.indexColumns().get(i).asc();
 
                 return asc ? compare : -compare;
             }
@@ -122,12 +124,12 @@ class IndexRowWrapper implements Comparable<IndexRowWrapper> {
             return false;
         }
         IndexRowWrapper that = (IndexRowWrapper) o;
-        return row.equals(that.row);
+        return idxRow.equals(that.idxRow);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(row);
+        return Objects.hash(idxRow);
     }
 
     /**
@@ -154,4 +156,11 @@ class IndexRowWrapper implements Comparable<IndexRowWrapper> {
                 .map(Comparable.class::cast)
                 .toArray(Comparable[]::new);
     }
+
+    /**
+     * Returns index row.
+     */
+    public IndexRow row() {
+        return idxRow;
+    }
 }
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
index 768e189..7095438 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
@@ -17,26 +17,20 @@
 
 package org.apache.ignite.internal.storage.rocksdb.index;
 
-import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toUnmodifiableList;
 import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
 import static org.apache.ignite.internal.schema.SchemaTestUtils.generateRandomValue;
 import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.apache.ignite.schema.SchemaBuilders.column;
 import static org.apache.ignite.schema.SchemaBuilders.tableBuilder;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.sameInstance;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -45,6 +39,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
@@ -55,13 +50,16 @@ import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSch
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.idx.SortedIndexColumnDescriptor;
+import org.apache.ignite.internal.idx.SortedIndexDescriptor;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.configuration.SchemaDescriptorConverter;
 import org.apache.ignite.internal.storage.engine.DataRegion;
 import org.apache.ignite.internal.storage.engine.TableStorage;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexRowPrefix;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
 import org.apache.ignite.internal.testframework.VariableSource;
@@ -70,15 +68,10 @@ import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.schema.SchemaBuilders;
 import org.apache.ignite.schema.definition.ColumnDefinition;
 import org.apache.ignite.schema.definition.ColumnType;
 import org.apache.ignite.schema.definition.TableDefinition;
-import org.apache.ignite.schema.definition.builder.SortedIndexDefinitionBuilder;
-import org.apache.ignite.schema.definition.builder.SortedIndexDefinitionBuilder.SortedIndexColumnBuilder;
-import org.apache.ignite.schema.definition.index.ColumnarIndexDefinition;
-import org.apache.ignite.schema.definition.index.HashIndexDefinition;
-import org.apache.ignite.schema.definition.index.SortedIndexDefinition;
+import org.apache.ignite.table.Tuple;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -99,24 +92,72 @@ public class RocksDbSortedIndexStorageTest {
      * Definitions of all supported column types.
      */
     private static final List<ColumnDefinition> ALL_TYPES_COLUMN_DEFINITIONS = allTypesColumnDefinitions();
-
+    /**
+     * List of resources that need to be closed at the end of each test.
+     */
+    private final List<AutoCloseable> resources = new ArrayList<>();
     private Random random;
-
     @InjectConfiguration(polymorphicExtensions = {
             HashIndexConfigurationSchema.class,
             SortedIndexConfigurationSchema.class
     })
     private TableConfiguration tableCfg;
-
     /**
      * Table Storage for creating indices.
      */
     private TableStorage tableStorage;
 
+    private static List<ColumnDefinition> allTypesColumnDefinitions() {
+        Stream<ColumnType> allColumnTypes = Stream.of(
+                ColumnType.INT8,
+                ColumnType.INT16,
+                ColumnType.INT32,
+                ColumnType.INT64,
+                ColumnType.FLOAT,
+                ColumnType.DOUBLE,
+                ColumnType.UUID,
+                ColumnType.DATE,
+                ColumnType.bitmaskOf(32),
+                ColumnType.string(),
+                ColumnType.blobOf(),
+                ColumnType.numberOf(),
+                ColumnType.decimalOf(),
+                ColumnType.time(),
+                ColumnType.datetime(),
+                ColumnType.timestamp()
+        );
+
+        return allColumnTypes
+                .map(type -> column(type.typeSpec().name(), type).asNullable(true).build())
+                .collect(toUnmodifiableList());
+    }
+
     /**
-     * List of resources that need to be closed at the end of each test.
+     * Extracts all data from a given cursor and closes it.
      */
-    private final List<AutoCloseable> resources = new ArrayList<>();
+    private static <T> List<T> cursorToList(Cursor<T> cursor) throws Exception {
+        try (cursor) {
+            var list = new ArrayList<T>();
+
+            cursor.forEachRemaining(list::add);
+
+            return list;
+        }
+    }
+
+    /**
+     * Extracts a single value by a given key or {@code null} if it does not exist.
+     */
+    @Nullable
+    private static IndexRow getSingle(SortedIndexStorage indexStorage, IndexRowWrapper entry) throws Exception {
+        IndexRowPrefix fullPrefix = entry.prefix(((SortedIndexStorageDescriptor) indexStorage.indexDescriptor()).indexColumns().size());
+
+        List<IndexRow> values = cursorToList(indexStorage.range(fullPrefix, fullPrefix, r -> true));
+
+        assertThat(values, anyOf(empty(), hasSize(1)));
+
+        return values.isEmpty() ? null : values.get(0);
+    }
 
     @BeforeEach
     void setUp(
@@ -174,31 +215,6 @@ public class RocksDbSortedIndexStorageTest {
         assertThat(createTableFuture, willBe(nullValue(Void.class)));
     }
 
-    private static List<ColumnDefinition> allTypesColumnDefinitions() {
-        Stream<ColumnType> allColumnTypes = Stream.of(
-                ColumnType.INT8,
-                ColumnType.INT16,
-                ColumnType.INT32,
-                ColumnType.INT64,
-                ColumnType.FLOAT,
-                ColumnType.DOUBLE,
-                ColumnType.UUID,
-                ColumnType.DATE,
-                ColumnType.bitmaskOf(32),
-                ColumnType.string(),
-                ColumnType.blobOf(),
-                ColumnType.numberOf(),
-                ColumnType.decimalOf(),
-                ColumnType.time(),
-                ColumnType.datetime(),
-                ColumnType.timestamp()
-        );
-
-        return allColumnTypes
-                .map(type -> column(type.typeSpec().name(), type).asNullable(true).build())
-                .collect(toUnmodifiableList());
-    }
-
     @AfterEach
     void tearDown() throws Exception {
         Collections.reverse(resources);
@@ -213,16 +229,21 @@ public class RocksDbSortedIndexStorageTest {
     void testRowSerialization() {
         SortedIndexStorage indexStorage = createIndex(ALL_TYPES_COLUMN_DEFINITIONS);
 
-        Object[] columns = indexStorage.indexDescriptor().indexRowColumns().stream()
-                .map(ColumnDescriptor::column)
-                .map(column -> generateRandomValue(random, column.type()))
-                .toArray();
+        Tuple tuple = Tuple.create();
+        ((SortedIndexStorageDescriptor) indexStorage.indexDescriptor()).indexColumns().stream()
+                .sequential()
+                .map(SortedIndexColumnDescriptor::column)
+                .forEach(column -> tuple.set(column.name(), generateRandomValue(random, column.type())));
+
+        BinaryIndexRowSerializer ser = new BinaryIndexRowSerializer((SortedIndexStorageDescriptor) indexStorage.indexDescriptor());
 
-        IndexRow row = indexStorage.indexRowFactory().createIndexRow(columns, new ByteArraySearchRow(new byte[0]));
+        IndexBinaryRow binRow = ser.serialize(new TestIndexRow(tuple, new ByteBufferRow(new byte[]{(byte) 0}), 0));
 
-        Object[] actual = indexStorage.indexRowDeserializer().indexedColumnValues(row);
+        IndexRow idxRow = ser.deserialize(binRow);
 
-        assertThat(actual, is(equalTo(columns)));
+        for (int i = 0; i < ((SortedIndexStorageDescriptor) indexStorage.indexDescriptor()).indexColumns().size(); ++i) {
+            assertThat(tuple.value(i), is(equalTo(idxRow.value(i))));
+        }
     }
 
     /**
@@ -260,7 +281,7 @@ public class RocksDbSortedIndexStorageTest {
                     return entry;
                 })
                 .sorted()
-                .collect(toList());
+                .collect(Collectors.toList());
 
         int firstIndex = 3;
         int lastIndex = 8;
@@ -268,17 +289,17 @@ public class RocksDbSortedIndexStorageTest {
         List<byte[]> expected = entries.stream()
                 .skip(firstIndex)
                 .limit(lastIndex - firstIndex + 1)
-                .map(e -> e.row().primaryKey().keyBytes())
-                .collect(toList());
+                .map(e -> e.row().primaryKey().bytes())
+                .collect(Collectors.toList());
 
         IndexRowPrefix first = entries.get(firstIndex).prefix(3);
         IndexRowPrefix last = entries.get(lastIndex).prefix(5);
 
-        List<byte[]> actual = cursorToList(indexStorage.range(first, last))
+        List<byte[]> actual = cursorToList(indexStorage.range(first, last, r -> true))
                 .stream()
                 .map(IndexRow::primaryKey)
-                .map(SearchRow::keyBytes)
-                .collect(toList());
+                .map(BinaryRow::bytes)
+                .collect(Collectors.toList());
 
         assertThat(actual, hasSize(lastIndex - firstIndex + 1));
 
@@ -308,85 +329,11 @@ public class RocksDbSortedIndexStorageTest {
         indexStorage.put(entry1.row());
         indexStorage.put(entry2.row());
 
-        List<IndexRow> actual = cursorToList(indexStorage.range(entry2::columns, entry1::columns));
-
-        assertThat(actual, is(empty()));
-    }
-
-    /**
-     * Tests creating a index that has not been created through the Configuration framework.
-     */
-    @Test
-    void testCreateMissingIndex() {
-        StorageException ex = assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex("does not exist"));
-
-        assertThat(ex.getMessage(), is(equalTo("Index configuration for \"does not exist\" could not be found")));
-    }
-
-    /**
-     * Tests creating a Sorted Index that has been misconfigured as a Hash Index.
-     */
-    @Test
-    void testCreateMisconfiguredIndex() {
-        HashIndexDefinition definition = SchemaBuilders.hashIndex("wrong type")
-                .withColumns("foo")
-                .build();
-
-        StorageException ex = assertThrows(StorageException.class, () -> createIndex(definition));
-
-        assertThat(ex.getMessage(), is(equalTo("Index \"WRONG TYPE\" is not configured as a Sorted Index. Actual type: HASH")));
-    }
-
-    /**
-     * Tests the {@link TableStorage#dropIndex} functionality.
-     */
-    @Test
-    void testDropIndex() throws Exception {
-        SortedIndexStorage storage = createIndex(ALL_TYPES_COLUMN_DEFINITIONS.subList(0, 1));
-
-        String indexName = storage.indexDescriptor().name();
-
-        assertThat(tableStorage.getOrCreateSortedIndex(indexName), is(sameInstance(storage)));
-
-        IndexRowWrapper entry = IndexRowWrapper.randomRow(storage);
-
-        storage.put(entry.row());
-
-        tableStorage.dropIndex(indexName);
-
-        SortedIndexStorage nextStorage = tableStorage.getOrCreateSortedIndex(indexName);
-
-        assertThat(nextStorage, is(not(sameInstance(storage))));
-        assertThat(getSingle(nextStorage, entry), is(nullValue()));
-    }
-
-    @ParameterizedTest
-    @VariableSource("ALL_TYPES_COLUMN_DEFINITIONS")
-    void testNullValues(ColumnDefinition columnDefinition) throws Exception {
-        SortedIndexStorage storage = createIndex(List.of(columnDefinition));
-
-        IndexRowWrapper entry1 = IndexRowWrapper.randomRow(storage);
-
-        Object[] nullArray = storage.indexDescriptor().indexRowColumns().stream()
-                .map(columnDescriptor -> columnDescriptor.indexedColumn() ? null : (byte) random.nextInt())
-                .toArray();
-
-        IndexRow nullRow = storage.indexRowFactory().createIndexRow(nullArray, new ByteArraySearchRow(randomBytes(random, 10)));
-
-        IndexRowWrapper entry2 = new IndexRowWrapper(storage, nullRow, nullArray);
-
-        storage.put(entry1.row());
-        storage.put(entry2.row());
+        int colCount = ((SortedIndexStorageDescriptor) indexStorage.indexDescriptor()).indexColumns().size();
 
-        if (entry1.compareTo(entry2) > 0) {
-            IndexRowWrapper t = entry2;
-            entry2 = entry1;
-            entry1 = t;
-        }
-
-        List<IndexRow> rows = cursorToList(storage.range(entry1::columns, entry2::columns));
+        List<IndexRow> actual = cursorToList(indexStorage.range(entry2.prefix(colCount), entry1.prefix(colCount), r -> true));
 
-        assertThat(rows, contains(entry1.row(), entry2.row()));
+        assertThat(actual, is(empty()));
     }
 
     private List<ColumnDefinition> shuffledRandomDefinitions() {
@@ -400,12 +347,12 @@ public class RocksDbSortedIndexStorageTest {
     private List<ColumnDefinition> shuffledDefinitions(Predicate<ColumnDefinition> filter) {
         List<ColumnDefinition> shuffledDefinitions = ALL_TYPES_COLUMN_DEFINITIONS.stream()
                 .filter(filter)
-                .collect(toList());
+                .collect(Collectors.toList());
 
         Collections.shuffle(shuffledDefinitions, random);
 
         if (log.isInfoEnabled()) {
-            List<String> columnNames = shuffledDefinitions.stream().map(ColumnDefinition::name).collect(toList());
+            List<String> columnNames = shuffledDefinitions.stream().map(ColumnDefinition::name).collect(Collectors.toList());
 
             log.info("Creating index with the following column order: " + columnNames);
         }
@@ -432,13 +379,13 @@ public class RocksDbSortedIndexStorageTest {
         indexStorage.put(entry2.row());
 
         assertThat(
-                getSingle(indexStorage, entry1).primaryKey().keyBytes(),
-                is(equalTo(entry1.row().primaryKey().keyBytes()))
+                getSingle(indexStorage, entry1).primaryKey().bytes(),
+                is(equalTo(entry1.row().primaryKey().bytes()))
         );
 
         assertThat(
-                getSingle(indexStorage, entry2).primaryKey().keyBytes(),
-                is(equalTo(entry2.row().primaryKey().keyBytes()))
+                getSingle(indexStorage, entry2).primaryKey().bytes(),
+                is(equalTo(entry2.row().primaryKey().bytes()))
         );
 
         indexStorage.remove(entry1.row());
@@ -450,62 +397,17 @@ public class RocksDbSortedIndexStorageTest {
      * Creates a Sorted Index using the given columns.
      */
     private SortedIndexStorage createIndex(List<ColumnDefinition> indexSchema) {
-        SortedIndexDefinitionBuilder indexDefinitionBuilder = SchemaBuilders.sortedIndex("foo");
-
-        indexSchema.forEach(column -> {
-            SortedIndexColumnBuilder columnBuilder = indexDefinitionBuilder.addIndexColumn(column.name());
-
-            if (random.nextBoolean()) {
-                columnBuilder.asc();
-            } else {
-                columnBuilder.desc();
-            }
-
-            columnBuilder.done();
-        });
-
-        SortedIndexDefinition indexDefinition = indexDefinitionBuilder.build();
-
-        return createIndex(indexDefinition);
-    }
-
-    /**
-     * Creates a Sorted Index using the given index definition.
-     */
-    private SortedIndexStorage createIndex(ColumnarIndexDefinition indexDefinition) {
-        CompletableFuture<Void> createIndexFuture = tableCfg.change(cfg ->
-                cfg.changeIndices(idxList ->
-                        idxList.create(indexDefinition.name(), idx -> convert(indexDefinition, idx))));
-
-        assertThat(createIndexFuture, willBe(nullValue(Void.class)));
-
-        return tableStorage.getOrCreateSortedIndex(indexDefinition.name());
-    }
-
-    /**
-     * Extracts all data from a given cursor and closes it.
-     */
-    private static <T> List<T> cursorToList(Cursor<T> cursor) throws Exception {
-        try (cursor) {
-            var list = new ArrayList<T>();
+        List<SortedIndexColumnDescriptor> cols = new ArrayList<>();
 
-            cursor.forEachRemaining(list::add);
+        for (int i = 0; i < indexSchema.size(); ++i) {
+            ColumnDefinition colDef = indexSchema.get(i);
 
-            return list;
+            Column col = new Column(colDef.name(), SchemaDescriptorConverter.convert(colDef.type()), true);
+            cols.add(new SortedIndexColumnDescriptor(col, random.nextBoolean()));
         }
-    }
-
-    /**
-     * Extracts a single value by a given key or {@code null} if it does not exist.
-     */
-    @Nullable
-    private static IndexRow getSingle(SortedIndexStorage indexStorage, IndexRowWrapper entry) throws Exception {
-        IndexRowPrefix fullPrefix = entry::columns;
 
-        List<IndexRow> values = cursorToList(indexStorage.range(fullPrefix, fullPrefix));
-
-        assertThat(values, anyOf(empty(), hasSize(1)));
-
-        return values.isEmpty() ? null : values.get(0);
+        return tableStorage.createSortedIndex(
+                new SortedIndexDescriptor("foo", cols, new Column[] {cols.get(0).column()})
+        );
     }
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowFactory.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/TestIndexRow.java
similarity index 50%
rename from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowFactory.java
rename to modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/TestIndexRow.java
index bbb1396..8e98cdb 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowFactory.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/TestIndexRow.java
@@ -15,22 +15,45 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.storage.index;
+package org.apache.ignite.internal.storage.rocksdb.index;
 
-import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.table.Tuple;
 
 /**
- * Temporary API for creating Index rows from a list of column values. All columns must be sorted according to the index columns order,
- * specified by the {@link SortedIndexDescriptor#indexRowColumns()}.
+ * Index row for tests.
  */
-public interface IndexRowFactory {
-    /**
-     * Creates an Index row from a list of column values.
-     */
-    IndexRow createIndexRow(Object[] columnValues, SearchRow primaryKey);
+public class TestIndexRow implements IndexRow {
+    private final Tuple tuple;
+
+    private final BinaryRow pk;
+
+    private final int part;
 
     /**
-     * Creates an Prefix row from a list of column values.
+     * Constructor.
      */
-    IndexRowPrefix createIndexRowPrefix(Object[] prefixColumnValues);
+    public TestIndexRow(Tuple t, BinaryRow pk, int part) {
+        this.tuple = t;
+        this.pk = pk;
+        this.part = part;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public BinaryRow primaryKey() {
+        return pk;
+    }
+
+    @Override
+    public int partition() {
+        return part;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Object value(int idxColOrder) {
+        return tuple.value(idxColOrder);
+    }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java b/modules/table/src/main/java/org/apache/ignite/internal/table/StorageRowListener.java
similarity index 50%
copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/StorageRowListener.java
index cda9d87..5e26181 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/StorageRowListener.java
@@ -15,40 +15,39 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.schema;
+package org.apache.ignite.internal.table;
 
-import org.apache.calcite.rel.RelCollation;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Ignite scannable index.
+ * Listen storage changes.
  */
-public class IgniteIndex {
-    private final RelCollation collation;
-
-    private final String idxName;
-
-    //    private final GridIndex<H2Row> idx;
-    private final InternalIgniteTable tbl;
+public interface StorageRowListener {
+    StorageRowListener NO_OP = new StorageRowListener() {
+        @Override
+        public void onUpdate(@Nullable BinaryRow oldRow, BinaryRow newRow, int partId) {
+            // No-op.
+        }
+
+        @Override
+        public void onRemove(BinaryRow row, int partId) {
+            // No-op.
+        }
+    };
 
     /**
-     * Constructor.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Called when row is updated.
+     *
+     * @param oldRow Old row.
+     * @param newRow New row.
      */
-    public IgniteIndex(RelCollation collation, String name, InternalIgniteTable tbl) {
-        this.collation = collation;
-        idxName = name;
-        this.tbl = tbl;
-    }
-
-    public RelCollation collation() {
-        return collation;
-    }
+    void onUpdate(@Nullable BinaryRow oldRow, BinaryRow newRow, int partId);
 
-    public String name() {
-        return idxName;
-    }
-
-    public InternalIgniteTable table() {
-        return tbl;
-    }
+    /**
+     * Called when row is removed.
+     *
+     * @param row Removed row.
+     */
+    void onRemove(BinaryRow row, int partId);
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 3687f78..9788386 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -17,8 +17,12 @@
 
 package org.apache.ignite.internal.table;
 
+import java.util.Collections;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
@@ -30,18 +34,21 @@ import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
  * Table view implementation for binary objects.
  */
-public class TableImpl implements Table {
+public class TableImpl implements Table, StorageRowListener {
     /** Internal table. */
     private final InternalTable tbl;
 
     /** Schema registry. */
     private final SchemaRegistry schemaReg;
 
+    private final Set<StorageRowListener> rowLstns = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
     /**
      * Constructor.
      *
@@ -123,4 +130,22 @@ public class TableImpl implements Table {
             throw new IgniteInternalException(e);
         }
     }
+
+    public void addRowListener(StorageRowListener lsnr) {
+        rowLstns.add(lsnr);
+    }
+
+    public void removeRowListener(StorageRowListener lsnr) {
+        rowLstns.remove(lsnr);
+    }
+
+    @Override
+    public void onUpdate(@Nullable BinaryRow oldRow, BinaryRow newRow, int partId) {
+        rowLstns.forEach(l -> l.onUpdate(oldRow, newRow, partId));
+    }
+
+    @Override
+    public void onRemove(BinaryRow row, int partId) {
+        rowLstns.forEach(l -> l.onRemove(row, partId));
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableStorageRowListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableStorageRowListener.java
new file mode 100644
index 0000000..f5379be
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableStorageRowListener.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Listen storage changes and pass the updates to the {@link TableImpl}.
+ */
+public class TableStorageRowListener implements StorageRowListener {
+    private StorageRowListener delegate;
+
+    /** {@inheritDoc} */
+    @Override
+    public void onUpdate(@Nullable BinaryRow oldRow, BinaryRow newRow, int partId) {
+        if (delegate != null) {
+            delegate.onUpdate(oldRow, newRow, partId);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void onRemove(BinaryRow row, int partId) {
+        if (delegate != null) {
+            delegate.onRemove(row, partId);
+        }
+    }
+
+    /**
+     * Listen storage changes and pass the updates to the {@link TableImpl}.
+     */
+    public void listen(StorageRowListener lsnr) {
+        delegate = lsnr;
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 7f3edc1..8743a97 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -59,9 +59,9 @@ import org.apache.ignite.internal.configuration.schema.ExtendedTableConfiguratio
 import org.apache.ignite.internal.configuration.schema.ExtendedTableView;
 import org.apache.ignite.internal.configuration.schema.SchemaConfiguration;
 import org.apache.ignite.internal.configuration.schema.SchemaView;
+import org.apache.ignite.internal.manager.AbstractProducer;
 import org.apache.ignite.internal.manager.EventListener;
 import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.internal.manager.Producer;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaException;
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableStorageRowListener;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.distributed.storage.VersionedRowStore;
@@ -103,7 +104,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Table manager.
  */
-public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTables, IgniteTablesInternal,
+public class TableManager extends AbstractProducer<TableEvent, TableEventParameters> implements IgniteTables, IgniteTablesInternal,
         IgniteComponent {
     /** The logger. */
     private static final IgniteLogger LOG = IgniteLogger.forClass(TableManager.class);
@@ -319,7 +320,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                             toAdd.removeAll(oldPartitionAssignment);
 
-                            InternalTable internalTable = tablesById.get(tblId).internalTable();
+                            TableImpl tbl = tablesById.get(tblId);
+                            InternalTable internalTable = tbl.internalTable();
 
                             // Create new raft nodes according to new assignments.
 
@@ -329,7 +331,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                         newPartitionAssignment,
                                         toAdd,
                                         () -> new PartitionListener(tblId,
-                                                new VersionedRowStore(internalTable.storage().getOrCreatePartition(partId), txManager))
+                                                new VersionedRowStore(internalTable.storage().getOrCreatePartition(partId), txManager, tbl))
                                 ).thenAccept(
                                         updatedRaftGroupService -> ((InternalTableImpl) internalTable).updateInternalTableRaftGroupService(
                                                 partId, updatedRaftGroupService)
@@ -477,6 +479,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
         tableStorage.start();
 
+        TableStorageRowListener storageUpdLsnr = new TableStorageRowListener();
+
         for (int p = 0; p < partitions; p++) {
             int partId = p;
 
@@ -486,7 +490,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                 raftGroupName(tblId, p),
                                 assignment.get(p),
                                 () -> new PartitionListener(tblId,
-                                        new VersionedRowStore(tableStorage.getOrCreatePartition(partId), txManager))
+                                        new VersionedRowStore(tableStorage.getOrCreatePartition(partId), txManager, storageUpdLsnr))
                         )
                 );
             } catch (NodeStoppingException e) {
@@ -540,6 +544,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                         schemaRegistry
                 );
 
+                storageUpdLsnr.listen(table);
+
                 tables.put(name, table);
                 tablesById.put(tblId, table);
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
index 4b76ccd..5c3e29c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.storage.PartitionStorage;
 import org.apache.ignite.internal.storage.SearchRow;
 import org.apache.ignite.internal.storage.basic.BinarySearchRow;
 import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.table.StorageRowListener;
 import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxState;
@@ -51,6 +52,9 @@ public class VersionedRowStore {
     /** Storage delegate. */
     private final PartitionStorage storage;
 
+    /** Update listener. */
+    private final StorageRowListener lsnr;
+
     /** Transaction manager. */
     private TxManager txManager;
 
@@ -61,8 +65,19 @@ public class VersionedRowStore {
      * @param txManager The TX manager.
      */
     public VersionedRowStore(@NotNull PartitionStorage storage, @NotNull TxManager txManager) {
+        this(storage, txManager, StorageRowListener.NO_OP);
+    }
+
+    /**
+     * The constructor.
+     *
+     * @param storage The storage.
+     * @param txManager The TX manager.
+     */
+    public VersionedRowStore(@NotNull PartitionStorage storage, @NotNull TxManager txManager, StorageRowListener lsnr) {
         this.storage = Objects.requireNonNull(storage);
         this.txManager = Objects.requireNonNull(txManager);
+        this.lsnr = lsnr;
     }
 
     /**
@@ -127,6 +142,8 @@ public class VersionedRowStore {
         Pair<BinaryRow, BinaryRow> pair = resolve(unpack(storage.read(key)), ts);
 
         storage.write(pack(key, new Value(row, pair.getSecond(), ts)));
+
+        lsnr.onUpdate(pair.getSecond(), row, storage.partitionId());
     }
 
     /**
@@ -168,6 +185,8 @@ public class VersionedRowStore {
         // Write a tombstone.
         storage.write(pack(key, new Value(null, pair.getSecond(), ts)));
 
+        lsnr.onRemove(pair.getSecond(), storage.partitionId());
+
         return true;
     }
 
@@ -205,6 +224,8 @@ public class VersionedRowStore {
 
         storage.write(pack(key, new Value(row, null, ts)));
 
+        lsnr.onUpdate(null, row, storage.partitionId());
+
         return true;
     }
 
diff --git a/parent/pom.xml b/parent/pom.xml
index 0d90766..9416062 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -312,6 +312,18 @@
                 <version>${project.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.ignite</groupId>
+                <artifactId>ignite-index</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.ignite</groupId>
+                <artifactId>ignite-index-api</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
             <!-- 3rd party dependencies -->
             <dependency>
                 <groupId>org.jetbrains</groupId>
diff --git a/pom.xml b/pom.xml
index ac1f885..57cceab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,6 +48,8 @@
         <module>modules/configuration-annotation-processor</module>
         <module>modules/configuration-api</module>
         <module>modules/core</module>
+        <module>modules/index</module>
+        <module>modules/index-api</module>
         <module>modules/marshaller-common</module>
         <module>modules/metastorage</module>
         <module>modules/metastorage-client</module>