You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/10/21 13:15:36 UTC

[ignite-3] 02/02: IGNITE-17820 SQL. Add native support for SEARCH/SARG operator (#1176)

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 5200823421e46924464e1f03fe9b18a7fbc3aeda
Author: Andrew V. Mashenkov <AM...@users.noreply.github.com>
AuthorDate: Tue Oct 18 16:10:18 2022 +0300

    IGNITE-17820 SQL. Add native support for SEARCH/SARG operator (#1176)
---
 .../sql/engine/exec/AbstractIndexScan.java         |  62 +--
 .../sql/engine/exec/LogicalRelImplementor.java     |  28 +-
 .../sql/engine/exec/RuntimeSortedIndex.java        |  62 ++-
 .../ignite/internal/sql/engine/exec/TreeIndex.java |   4 +-
 .../sql/engine/exec/exp/ExpressionFactory.java     |  13 +
 .../sql/engine/exec/exp/ExpressionFactoryImpl.java | 261 +++++++++++-
 .../{TreeIndex.java => exp/RangeCondition.java}    |  29 +-
 .../{TreeIndex.java => exp/RangeIterable.java}     |  20 +-
 .../sql/engine/exec/rel/IndexScanNode.java         |  97 +++--
 .../sql/engine/exec/rel/IndexSpoolNode.java        |   7 +-
 .../sql/engine/externalize/RelInputEx.java         |  10 +
 .../internal/sql/engine/externalize/RelJson.java   |  61 ++-
 .../sql/engine/externalize/RelJsonReader.java      |   7 +
 .../sql/engine/metadata/IgniteMdSelectivity.java   |  40 +-
 .../sql/engine/prepare/IgnitePrograms.java         |   2 +
 .../sql/engine/prepare/bounds/ExactBounds.java     |  74 ++++
 .../sql/engine/prepare/bounds/MultiBounds.java     |  76 ++++
 .../sql/engine/prepare/bounds/RangeBounds.java     | 126 ++++++
 .../sql/engine/prepare/bounds/SearchBounds.java    |  66 +++
 .../internal/sql/engine/rel/AbstractIndexScan.java |  80 +---
 .../internal/sql/engine/rel/IgniteIndexScan.java   |  16 +-
 .../sql/engine/rel/IgniteSortedIndexSpool.java     |  30 +-
 .../engine/rel/ProjectableFilterableTableScan.java |   9 +-
 .../engine/rel/logical/IgniteLogicalIndexScan.java |  22 +-
 .../FilterSpoolMergeToSortedIndexSpoolRule.java    |  46 +--
 .../sql/engine/rule/LogicalScanConverterRule.java  |   2 +-
 .../engine/rule/logical/FilterScanMergeRule.java   |  18 +-
 .../internal/sql/engine/util/IndexConditions.java  | 151 -------
 .../ignite/internal/sql/engine/util/RexUtils.java  | 449 +++++++++++++--------
 .../sql/engine/exec/RuntimeSortedIndexTest.java    |   2 +-
 .../exec/rel/IndexScanNodeExecutionTest.java       |  55 ++-
 .../exec/rel/SortedIndexSpoolExecutionTest.java    |  54 ++-
 .../CorrelatedNestedLoopJoinPlannerTest.java       |  28 +-
 .../planner/ProjectFilterScanMergePlannerTest.java |  24 +-
 .../planner/SortedIndexSpoolPlannerTest.java       |  85 ++--
 35 files changed, 1403 insertions(+), 713 deletions(-)

diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AbstractIndexScan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AbstractIndexScan.java
index 18a03e4d64..2779dee515 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AbstractIndexScan.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AbstractIndexScan.java
@@ -17,11 +17,13 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
+import com.google.common.collect.Streams;
 import java.util.Iterator;
 import java.util.function.Function;
 import java.util.function.Predicate;
-import java.util.function.Supplier;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
+import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.FilteringIterator;
 import org.apache.ignite.internal.util.TransformingIterator;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -35,11 +37,8 @@ public abstract class AbstractIndexScan<RowT, IdxRowT> implements Iterable<RowT>
     /** Additional filters. */
     private final Predicate<RowT> filters;
 
-    /** Lower index scan bound. */
-    private final Supplier<RowT> lowerBound;
-
-    /** Upper index scan bound. */
-    private final Supplier<RowT> upperBound;
+    /** Index scan bounds. */
+    private final RangeIterable<RowT> ranges;
 
     private final Function<RowT, RowT> rowTransformer;
 
@@ -50,12 +49,11 @@ public abstract class AbstractIndexScan<RowT, IdxRowT> implements Iterable<RowT>
     /**
      * Constructor.
      *
-     * @param ectx       Execution context.
-     * @param rowType    Rel data type.
-     * @param idx        Physical index.
-     * @param filters    Additional filters.
-     * @param lowerBound Lower index scan bound.
-     * @param upperBound Upper index scan bound.
+     * @param ectx Execution context.
+     * @param rowType Rel data type.
+     * @param idx Physical index.
+     * @param filters Additional filters.
+     * @param ranges Index scan bounds.
      * @param rowTransformer Row transformer.
      */
     protected AbstractIndexScan(
@@ -63,37 +61,39 @@ public abstract class AbstractIndexScan<RowT, IdxRowT> implements Iterable<RowT>
             RelDataType rowType,
             TreeIndex<IdxRowT> idx,
             Predicate<RowT> filters,
-            Supplier<RowT> lowerBound,
-            Supplier<RowT> upperBound,
+            RangeIterable<RowT> ranges,
             Function<RowT, RowT> rowTransformer
     ) {
         this.ectx = ectx;
         this.rowType = rowType;
         this.idx = idx;
         this.filters = filters;
-        this.lowerBound = lowerBound;
-        this.upperBound = upperBound;
+        this.ranges = ranges;
         this.rowTransformer = rowTransformer;
     }
 
     /** {@inheritDoc} */
     @Override
     public synchronized Iterator<RowT> iterator() {
-        IdxRowT lower = lowerBound == null ? null : row2indexRow(lowerBound.get());
-        IdxRowT upper = upperBound == null ? null : row2indexRow(upperBound.get());
-
-        Iterator<RowT> it = new TransformingIterator<>(
-                idx.find(lower, upper),
-                this::indexRow2Row
-        );
-
-        it = new FilteringIterator<>(it, filters);
-
-        if (rowTransformer != null) {
-            it = new TransformingIterator<>(it, rowTransformer);
-        }
-
-        return it;
+        Iterable<RowT>[] iterables = Streams.stream(ranges)
+                .map(range -> new Iterable<RowT>() {
+                            @Override
+                            public Iterator<RowT> iterator() {
+                                Iterator<RowT> it = new TransformingIterator<>(
+                                        idx.find(row2indexRow(range.lower()), row2indexRow(range.upper()), range.lowerInclude(),
+                                                range.upperInclude()),
+                                        AbstractIndexScan.this::indexRow2Row
+                                );
+
+                                it = new FilteringIterator<>(it, filters);
+
+                                return (rowTransformer != null) ? new TransformingIterator<>(it, rowTransformer) : it;
+                            }
+                        }
+                )
+                .toArray(Iterable[]::new);
+
+        return CollectionUtils.concat(iterables).iterator();
     }
 
     protected abstract IdxRowT row2indexRow(RowT bound);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index b7d260d9a8..dc9208d9a7 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -42,6 +42,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
 import org.apache.ignite.internal.sql.engine.exec.rel.AbstractSetOpNode;
@@ -68,6 +69,7 @@ import org.apache.ignite.internal.sql.engine.exec.rel.TableSpoolNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.UnionAllNode;
 import org.apache.ignite.internal.sql.engine.metadata.AffinityService;
 import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
@@ -126,10 +128,10 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
     /**
      * Constructor.
      *
-     * @param ctx             Root context.
-     * @param affSrvc         Affinity service.
+     * @param ctx Root context.
+     * @param affSrvc Affinity service.
      * @param mailboxRegistry Mailbox registry.
-     * @param exchangeSvc     Exchange service.
+     * @param exchangeSvc Exchange service.
      */
     public LogicalRelImplementor(
             ExecutionContext<RowT> ctx,
@@ -290,21 +292,20 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
         ImmutableBitSet requiredColumns = rel.requiredColumns();
         RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
 
-        List<RexNode> lowerCond = rel.lowerBound();
-        List<RexNode> upperCond = rel.upperBound();
+        List<SearchBounds> searchBounds = rel.searchBounds();
         RexNode condition = rel.condition();
         List<RexNode> projects = rel.projects();
 
-        Supplier<RowT> lower = lowerCond == null ? null : expressionFactory.rowSource(lowerCond);
-        Supplier<RowT> upper = upperCond == null ? null : expressionFactory.rowSource(upperCond);
         Predicate<RowT> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
         Function<RowT, RowT> prj = projects == null ? null : expressionFactory.project(projects, rowType);
+        RangeIterable<RowT> ranges = searchBounds == null ? null :
+                expressionFactory.ranges(searchBounds, rel.collation(), tbl.getRowType(typeFactory));
 
         IgniteIndex idx = tbl.getIndex(rel.indexName());
         ColocationGroup group = ctx.group(rel.sourceId());
         int[] parts = group.partitions(ctx.localNodeId());
 
-        return new IndexScanNode<>(ctx, rowType, idx, tbl, parts, lower, upper, filters, prj, requiredColumns.toBitSet());
+        return new IndexScanNode<>(ctx, rowType, idx, tbl, parts, ranges, filters, prj, requiredColumns.toBitSet());
     }
 
     /** {@inheritDoc} */
@@ -410,14 +411,10 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
     public Node<RowT> visit(IgniteSortedIndexSpool rel) {
         RelCollation collation = rel.collation();
 
-        assert rel.indexCondition() != null : rel;
-
-        List<RexNode> lowerBound = rel.indexCondition().lowerBound();
-        List<RexNode> upperBound = rel.indexCondition().upperBound();
+        assert rel.searchBounds() != null : rel;
 
         Predicate<RowT> filter = expressionFactory.predicate(rel.condition(), rel.getRowType());
-        Supplier<RowT> lower = lowerBound == null ? null : expressionFactory.rowSource(lowerBound);
-        Supplier<RowT> upper = upperBound == null ? null : expressionFactory.rowSource(upperBound);
+        RangeIterable<RowT> ranges = expressionFactory.ranges(rel.searchBounds(), collation, rel.getRowType());
 
         IndexSpoolNode<RowT> node = IndexSpoolNode.createTreeSpool(
                 ctx,
@@ -425,8 +422,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
                 collation,
                 expressionFactory.comparator(collation),
                 filter,
-                lower,
-                upper
+                ranges
         );
 
         Node<RowT> input = visit(rel.getInput());
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndex.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndex.java
index 027b88ceb5..23495f2007 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndex.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndex.java
@@ -25,9 +25,9 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.function.Predicate;
-import java.util.function.Supplier;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
@@ -79,18 +79,16 @@ public class RuntimeSortedIndex<RowT> implements RuntimeIndex<RowT>, TreeIndex<R
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<RowT> find(RowT lower, RowT upper) {
+    public Cursor<RowT> find(RowT lower, RowT upper, boolean lowerInclude, boolean upperInclude) {
         int firstCol = first(collation.getKeys());
 
-        if (ectx.rowHandler().get(firstCol, lower) != null && ectx.rowHandler().get(firstCol, upper) != null) {
-            return new CursorImpl(rows, lower, upper);
-        } else if (ectx.rowHandler().get(firstCol, lower) == null && ectx.rowHandler().get(firstCol, upper) != null) {
-            return new CursorImpl(rows, null, upper);
-        } else if (ectx.rowHandler().get(firstCol, lower) != null && ectx.rowHandler().get(firstCol, upper) == null) {
-            return new CursorImpl(rows, lower, null);
-        } else {
-            return new CursorImpl(rows, null, null);
-        }
+        Object lowerBound = (lower == null) ? null : ectx.rowHandler().get(firstCol, lower);
+        Object upperBound = (upper == null) ? null : ectx.rowHandler().get(firstCol, upper);
+
+        RowT lowerRow = (lowerBound == null) ? null : lower;
+        RowT upperRow = (upperBound == null) ? null : upper;
+
+        return new CursorImpl(rows, lowerRow, upperRow, lowerInclude, upperInclude);
     }
 
     /**
@@ -100,10 +98,9 @@ public class RuntimeSortedIndex<RowT> implements RuntimeIndex<RowT>, TreeIndex<R
             ExecutionContext<RowT> ectx,
             RelDataType rowType,
             Predicate<RowT> filter,
-            Supplier<RowT> lowerBound,
-            Supplier<RowT> upperBound
+            RangeIterable<RowT> ranges
     ) {
-        return new IndexScan(rowType, this, filter, lowerBound, upperBound);
+        return new IndexScan(rowType, this, filter, ranges);
     }
 
     /**
@@ -116,14 +113,27 @@ public class RuntimeSortedIndex<RowT> implements RuntimeIndex<RowT>, TreeIndex<R
         /** Upper bound. */
         private final RowT upper;
 
+        /** Include upper bound. */
+        private final boolean includeUpper;
+
         /** Current index of list element. */
         private int idx;
 
-        CursorImpl(List<RowT> rows, @Nullable RowT lower, @Nullable RowT upper) {
+        /**
+         * Creates sorted index cursor.
+         *
+         * @param rows List of rows.
+         * @param lower Lower bound.
+         * @param upper Upper bound.
+         * @param lowerInclude {@code True} for inclusive lower bound.
+         * @param upperInclude {@code True} for inclusive upper bound.
+         */
+        CursorImpl(List<RowT> rows, @Nullable RowT lower, @Nullable RowT upper, boolean lowerInclude, boolean upperInclude) {
             this.rows = rows;
             this.upper = upper;
+            this.includeUpper = upperInclude;
 
-            idx = lower == null ? 0 : lowerBound(rows, lower);
+            idx = lower == null ? 0 : lowerBound(rows, lower, lowerInclude);
         }
 
         /**
@@ -131,9 +141,10 @@ public class RuntimeSortedIndex<RowT> implements RuntimeIndex<RowT>, TreeIndex<R
          *
          * @param rows List of rows.
          * @param bound Lower bound.
+         * @param includeBound {@code True} for inclusive bound.
          * @return Lower bound position in the list.
          */
-        private int lowerBound(List<RowT> rows, RowT bound) {
+        private int lowerBound(List<RowT> rows, RowT bound, boolean includeBound) {
             int low = 0;
             int high = rows.size() - 1;
             int idx = -1;
@@ -144,7 +155,7 @@ public class RuntimeSortedIndex<RowT> implements RuntimeIndex<RowT>, TreeIndex<R
 
                 if (compRes > 0) {
                     high = mid - 1;
-                } else if (compRes == 0) {
+                } else if (compRes == 0 && includeBound) {
                     idx = mid;
                     high = mid - 1;
                 } else {
@@ -158,7 +169,7 @@ public class RuntimeSortedIndex<RowT> implements RuntimeIndex<RowT>, TreeIndex<R
         /** {@inheritDoc} */
         @Override
         public boolean hasNext() {
-            if (idx == rows.size() || (upper != null && comp.compare(upper, rows.get(idx)) < 0)) {
+            if (idx == rows.size() || (upper != null && comp.compare(upper, rows.get(idx)) < (includeUpper ? 0 : 1))) {
                 return false;
             }
 
@@ -186,14 +197,21 @@ public class RuntimeSortedIndex<RowT> implements RuntimeIndex<RowT>, TreeIndex<R
      * Index scan for RuntimeSortedIndex.
      */
     private class IndexScan extends AbstractIndexScan<RowT, RowT> {
+        /**
+         * Creates index scan.
+         *
+         * @param rowType Row type.
+         * @param idx Physical index.
+         * @param filter Additional filters.
+         * @param ranges Index scan bounds.
+         */
         IndexScan(
                 RelDataType rowType,
                 TreeIndex<RowT> idx,
                 Predicate<RowT> filter,
-                Supplier<RowT> lowerBound,
-                Supplier<RowT> upperBound
+                RangeIterable<RowT> ranges
         ) {
-            super(RuntimeSortedIndex.this.ectx, rowType, idx, filter, lowerBound, upperBound, null);
+            super(RuntimeSortedIndex.this.ectx, rowType, idx, filter, ranges, null);
         }
 
         /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TreeIndex.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TreeIndex.java
index cd2bd12000..5d40c5d5a7 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TreeIndex.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TreeIndex.java
@@ -30,7 +30,9 @@ public interface TreeIndex<R> {
      *
      * @param lower Lower bound.
      * @param upper Upper bound.
+     * @param lowerInclude Inclusive lower bound.
+     * @param upperInclude Inclusive upper bound.
      * @return Cursor over the rows within bounds.
      */
-    Cursor<R> find(R lower, R upper);
+    Cursor<R> find(R lower, R upper, boolean lowerInclude, boolean upperInclude);
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactory.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactory.java
index 0b148268b7..9867073d55 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactory.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactory.java
@@ -31,6 +31,7 @@ import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 
 /**
  * Expression factory.
@@ -105,6 +106,18 @@ public interface ExpressionFactory<RowT> {
      */
     Supplier<RowT> rowSource(List<RexNode> values);
 
+    /**
+     * Creates iterable search bounds tuples (lower row/upper row) by search bounds expressions.
+     *
+     * @param searchBounds Search bounds.
+     * @param collation Collation.
+     * @param rowType Row type.
+     */
+    RangeIterable<RowT> ranges(
+            List<SearchBounds> searchBounds,
+            RelCollation collation,
+            RelDataType rowType
+    );
 
     /**
      * Executes expression.
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
index de6dae6f56..1bc734e54e 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
@@ -23,9 +23,13 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import java.lang.reflect.Modifier;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
@@ -65,14 +69,18 @@ import org.apache.ignite.internal.sql.engine.exec.exp.RexToLixTranslator.InputGe
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorsFactory;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.ExactBounds;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.MultiBounds;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.RangeBounds;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.IgniteMethod;
 import org.apache.ignite.internal.sql.engine.util.Primitives;
 
 /**
- * Implements rex expression into a function object. Uses JaninoRexCompiler under the hood. Each expression compiles
- * into a class and a wrapper over it is returned.
+ * Implements rex expression into a function object. Uses JaninoRexCompiler under the hood. Each expression compiles into a class and a
+ * wrapper over it is returned.
  */
 public class ExpressionFactoryImpl<RowT> implements ExpressionFactory<RowT> {
     private static final int CACHE_SIZE = 1024;
@@ -234,7 +242,8 @@ public class ExpressionFactoryImpl<RowT> implements ExpressionFactory<RowT> {
     }
 
     /** {@inheritDoc} */
-    @Override public BiPredicate<RowT, RowT> biPredicate(RexNode filter, RelDataType rowType) {
+    @Override
+    public BiPredicate<RowT, RowT> biPredicate(RexNode filter, RelDataType rowType) {
         return new BiPredicateImpl(biScalar(filter, rowType));
     }
 
@@ -288,6 +297,127 @@ public class ExpressionFactoryImpl<RowT> implements ExpressionFactory<RowT> {
         return rows;
     }
 
+
+    /** {@inheritDoc} */
+    @Override
+    public RangeIterable<RowT> ranges(
+            List<SearchBounds> searchBounds,
+            RelCollation collation,
+            RelDataType rowType
+    ) {
+        RowFactory<RowT> rowFactory = ctx.rowHandler().factory(typeFactory, rowType);
+
+        List<RangeCondition<RowT>> ranges = new ArrayList<>();
+
+        expandBounds(
+                ranges,
+                searchBounds,
+                rowType,
+                rowFactory,
+                collation.getKeys(),
+                0,
+                Arrays.asList(new RexNode[searchBounds.size()]),
+                Arrays.asList(new RexNode[searchBounds.size()]),
+                true,
+                true
+        );
+
+        return new RangeIterableImpl(ranges, comparator(collation));
+    }
+
+    /**
+     * Expand column-oriented {@link SearchBounds} to a row-oriented list of ranges ({@link RangeCondition}).
+     *
+     * @param ranges List of ranges.
+     * @param searchBounds Search bounds.
+     * @param rowType Row type.
+     * @param rowFactory Row factory.
+     * @param collationKeys Collation keys.
+     * @param collationKeyIdx Current collation key index (field to process).
+     * @param curLower Current lower row.
+     * @param curUpper Current upper row.
+     * @param lowerInclude Include current lower row.
+     * @param upperInclude Include current upper row.
+     */
+    private void expandBounds(
+            List<RangeCondition<RowT>> ranges,
+            List<SearchBounds> searchBounds,
+            RelDataType rowType,
+            RowFactory<RowT> rowFactory,
+            List<Integer> collationKeys,
+            int collationKeyIdx,
+            List<RexNode> curLower,
+            List<RexNode> curUpper,
+            boolean lowerInclude,
+            boolean upperInclude
+    ) {
+        if ((collationKeyIdx >= collationKeys.size())
+                || (!lowerInclude && !upperInclude)
+                || searchBounds.get(collationKeys.get(collationKeyIdx)) == null) {
+            ranges.add(new RangeConditionImpl(
+                    scalar(curLower, rowType),
+                    scalar(curUpper, rowType),
+                    lowerInclude,
+                    upperInclude,
+                    rowFactory
+            ));
+
+            return;
+        }
+
+        int fieldIdx = collationKeys.get(collationKeyIdx);
+        SearchBounds fieldBounds = searchBounds.get(fieldIdx);
+
+        Collection<SearchBounds> fieldMultiBounds = fieldBounds instanceof MultiBounds
+                ? ((MultiBounds) fieldBounds).bounds()
+                : Collections.singleton(fieldBounds);
+
+        for (SearchBounds fieldSingleBounds : fieldMultiBounds) {
+            RexNode fieldLowerBound;
+            RexNode fieldUpperBound;
+            boolean fieldLowerInclude;
+            boolean fieldUpperInclude;
+
+            if (fieldSingleBounds instanceof ExactBounds) {
+                fieldLowerBound = fieldUpperBound = ((ExactBounds) fieldSingleBounds).bound();
+                fieldLowerInclude = fieldUpperInclude = true;
+            } else if (fieldSingleBounds instanceof RangeBounds) {
+                RangeBounds fieldRangeBounds = (RangeBounds) fieldSingleBounds;
+
+                fieldLowerBound = fieldRangeBounds.lowerBound();
+                fieldUpperBound = fieldRangeBounds.upperBound();
+                fieldLowerInclude = fieldRangeBounds.lowerInclude();
+                fieldUpperInclude = fieldRangeBounds.upperInclude();
+            } else {
+                throw new IllegalStateException("Unexpected bounds: " + fieldSingleBounds);
+            }
+
+            if (lowerInclude) {
+                curLower.set(fieldIdx, fieldLowerBound);
+            }
+
+            if (upperInclude) {
+                curUpper.set(fieldIdx, fieldUpperBound);
+            }
+
+            expandBounds(
+                    ranges,
+                    searchBounds,
+                    rowType,
+                    rowFactory,
+                    collationKeys,
+                    collationKeyIdx + 1,
+                    curLower,
+                    curUpper,
+                    lowerInclude && fieldLowerInclude,
+                    upperInclude && fieldUpperInclude
+            );
+        }
+
+        curLower.set(fieldIdx, null);
+        curLower.set(fieldIdx, null);
+    }
+
     /**
      * Creates {@link SingleScalar}, a code-generated expressions evaluator.
      *
@@ -428,7 +558,8 @@ public class ExpressionFactoryImpl<RowT> implements ExpressionFactory<RowT> {
             b.append(node.getType().getFullTypeString());
 
             new RexShuttle() {
-                @Override public RexNode visitFieldAccess(RexFieldAccess fieldAccess) {
+                @Override
+                public RexNode visitFieldAccess(RexFieldAccess fieldAccess) {
                     b.append(", fldIdx=").append(fieldAccess.getField().getIndex());
 
                     return super.visitFieldAccess(fieldAccess);
@@ -492,7 +623,8 @@ public class ExpressionFactoryImpl<RowT> implements ExpressionFactory<RowT> {
         }
 
         /** {@inheritDoc} */
-        @Override public boolean test(RowT r1, RowT r2) {
+        @Override
+        public boolean test(RowT r1, RowT r2) {
             scalar.execute(ctx, r1, r2, out);
             return Boolean.TRUE == hnd.get(0, out);
         }
@@ -506,7 +638,7 @@ public class ExpressionFactoryImpl<RowT> implements ExpressionFactory<RowT> {
         /**
          * Constructor.
          *
-         * @param scalar  Scalar.
+         * @param scalar Scalar.
          * @param factory Row factory.
          */
         private ProjectImpl(SingleScalar scalar, RowFactory<RowT> factory) {
@@ -570,6 +702,120 @@ public class ExpressionFactoryImpl<RowT> implements ExpressionFactory<RowT> {
         }
     }
 
+    private class RangeConditionImpl implements RangeCondition<RowT> {
+        /** Lower bound expression. */
+        private final SingleScalar lowerBound;
+
+        /** Upper bound expression. */
+        private final SingleScalar upperBound;
+
+        /** Inclusive lower bound flag. */
+        private final boolean lowerInclude;
+
+        /** Inclusive upper bound flag. */
+        private final boolean upperInclude;
+
+        /** Lower row. */
+        private RowT lowerRow;
+
+        /** Upper row. */
+        private RowT upperRow;
+
+        /** Row factory. */
+        private final RowFactory<RowT> factory;
+
+        private RangeConditionImpl(
+                SingleScalar lowerBound,
+                SingleScalar upperBound,
+                boolean lowerInclude,
+                boolean upperInclude,
+                RowFactory<RowT> factory
+        ) {
+            this.lowerBound = lowerBound;
+            this.upperBound = upperBound;
+            this.lowerInclude = lowerInclude;
+            this.upperInclude = upperInclude;
+            this.factory = factory;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public RowT lower() {
+            return lowerRow != null ? lowerRow : (lowerRow = getRow(lowerBound));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public RowT upper() {
+            return upperRow != null ? upperRow : (upperRow = getRow(upperBound));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean lowerInclude() {
+            return lowerInclude;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean upperInclude() {
+            return upperInclude;
+        }
+
+        /** Compute row. */
+        private RowT getRow(SingleScalar scalar) {
+            RowT res = factory.create();
+            scalar.execute(ctx, null, res);
+
+            return res;
+        }
+
+        /** Clear cached rows. */
+        public void clearCache() {
+            lowerRow = upperRow = null;
+        }
+    }
+
+    private class RangeIterableImpl implements RangeIterable<RowT> {
+        private final List<RangeCondition<RowT>> ranges;
+
+        private final Comparator<RowT> comparator;
+
+        private boolean sorted;
+
+        public RangeIterableImpl(List<RangeCondition<RowT>> ranges, Comparator<RowT> comparator) {
+            this.ranges = ranges;
+            this.comparator = comparator;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int size() {
+            return ranges.size();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Iterator<RangeCondition<RowT>> iterator() {
+            ranges.forEach(b -> ((RangeConditionImpl) b).clearCache());
+
+            if (ranges.size() == 1) {
+                return ranges.iterator();
+            }
+
+            // Sort ranges using collation comparator to produce sorted output. There should be no ranges
+            // intersection.
+            // Do not sort again if ranges already were sorted before, different values of correlated variables
+            // should not affect ordering.
+            if (!sorted) {
+                ranges.sort((o1, o2) -> comparator.compare(o1.lower(), o2.lower()));
+                sorted = true;
+            }
+
+            return ranges.iterator();
+        }
+    }
+
     private class BiFieldGetter extends CommonFieldGetter {
         private final Expression row2;
 
@@ -598,7 +844,8 @@ public class ExpressionFactoryImpl<RowT> implements ExpressionFactory<RowT> {
         }
 
         /** {@inheritDoc} */
-        @Override protected Expression fillExpressions(BlockBuilder list, int index) {
+        @Override
+        protected Expression fillExpressions(BlockBuilder list, int index) {
             Expression row = list.append("row", this.row);
 
             Expression field = Expressions.call(hnd,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TreeIndex.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RangeCondition.java
similarity index 61%
copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TreeIndex.java
copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RangeCondition.java
index cd2bd12000..91c9cf5b1a 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TreeIndex.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RangeCondition.java
@@ -15,22 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.exec;
-
-import org.apache.ignite.internal.util.Cursor;
+package org.apache.ignite.internal.sql.engine.exec.exp;
 
 /**
- * Tree index interface.
+ * A range condition is a search condition which represents a comparison predicate or a BETWEEN predicate.
  *
- * @param <R> Indexing row type.
+ * <p>Used to define bounds of a range scan.
  */
-public interface TreeIndex<R> {
-    /**
-     * Index lookup method.
-     *
-     * @param lower Lower bound.
-     * @param upper Upper bound.
-     * @return Cursor over the rows within bounds.
-     */
-    Cursor<R> find(R lower, R upper);
+public interface RangeCondition<RowT> {
+    /** Lower search row. */
+    public RowT lower();
+
+    /** Upper search row. */
+    public RowT upper();
+
+    /** Inlusive search by lower row. */
+    public boolean lowerInclude();
+
+    /** Inlusive search by upper row. */
+    public boolean upperInclude();
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TreeIndex.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RangeIterable.java
similarity index 67%
copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TreeIndex.java
copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RangeIterable.java
index cd2bd12000..773b16a9a8 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TreeIndex.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RangeIterable.java
@@ -15,22 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.exec;
-
-import org.apache.ignite.internal.util.Cursor;
+package org.apache.ignite.internal.sql.engine.exec.exp;
 
 /**
- * Tree index interface.
- *
- * @param <R> Indexing row type.
+ * Iterable over range conditions.
  */
-public interface TreeIndex<R> {
-    /**
-     * Index lookup method.
-     *
-     * @param lower Lower bound.
-     * @param upper Upper bound.
-     * @return Cursor over the rows within bounds.
-     */
-    Cursor<R> find(R lower, R upper);
+public interface RangeIterable<RowT> extends Iterable<RangeCondition<RowT>> {
+    /** Count of ranges in iterable. */
+    public int size();
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
index c33e4e5b4c..90064f3c3a 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
 import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
 
 import java.util.BitSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.Flow;
@@ -27,7 +28,6 @@ import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Function;
 import java.util.function.Predicate;
-import java.util.function.Supplier;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.internal.index.SortedIndex;
 import org.apache.ignite.internal.schema.BinaryTuple;
@@ -35,6 +35,8 @@ import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowConverter;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
 import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
@@ -68,11 +70,9 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
     /** Participating columns. */
     private final @Nullable BitSet requiredColumns;
 
-    private final @Nullable Supplier<RowT> lowerCond;
+    private final RangeIterable<RowT> rangeConditions;
 
-    private final @Nullable Supplier<RowT> upperCond;
-
-    private final int flags;
+    private Iterator<RangeCondition<RowT>> rangeConditionIterator;
 
     private int requested;
 
@@ -84,10 +84,6 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
 
     private int curPartIdx;
 
-    private @Nullable BinaryTuple lowerBound;
-
-    private @Nullable BinaryTuple upperBound;
-
     /**
      * Constructor.
      *
@@ -95,6 +91,7 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
      * @param rowType Output type of the current node.
      * @param schemaTable The table this node should scan.
      * @param parts Partition numbers to scan.
+     * @param rangeConditions Range conditions.
      * @param filters Optional filter to filter out rows.
      * @param rowTransformer Optional projection function.
      * @param requiredColumns Optional set of column of interest.
@@ -105,8 +102,7 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
             IgniteIndex schemaIndex,
             InternalIgniteTable schemaTable,
             int[] parts,
-            @Nullable Supplier<RowT> lowerCond,
-            @Nullable Supplier<RowT> upperCond,
+            @Nullable RangeIterable<RowT> rangeConditions,
             @Nullable Predicate<RowT> filters,
             @Nullable Function<RowT, RowT> rowTransformer,
             @Nullable BitSet requiredColumns
@@ -116,18 +112,14 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
 
         this.schemaIndex = schemaIndex;
         this.parts = parts;
-        this.lowerCond = lowerCond;
-        this.upperCond = upperCond;
         this.filters = filters;
         this.rowTransformer = rowTransformer;
         this.requiredColumns = requiredColumns;
+        this.rangeConditions = rangeConditions;
 
         factory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
 
         indexRowSchema = RowConverter.createIndexRowSchema(schemaTable.descriptor(), schemaIndex.index().descriptor());
-
-        // TODO: create ticket to add flags support
-        flags = SortedIndex.INCLUDE_LEFT & SortedIndex.INCLUDE_RIGHT;
     }
 
     /** {@inheritDoc} */
@@ -162,8 +154,7 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
         requested = 0;
         waiting = 0;
         curPartIdx = 0;
-        lowerBound = null;
-        upperBound = null;
+        rangeConditionIterator = null;
 
         if (activeSubscription != null) {
             activeSubscription.cancel();
@@ -246,13 +237,36 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
             if (schemaIndex.type() == Type.SORTED) {
                 //TODO: https://issues.apache.org/jira/browse/IGNITE-17813
                 // Introduce new publisher using merge-sort algo to merge partition index publishers.
-                if (lowerBound == null && upperBound == null) {
-                    lowerBound = toBinaryTuplePrefix(lowerCond);
-                    upperBound = toBinaryTuplePrefix(upperCond);
+                int part = curPartIdx;
+
+                int flags = 0;
+                BinaryTuple lowerBound = null;
+                BinaryTuple upperBound = null;
+
+                if (rangeConditions == null) {
+                    flags = SortedIndex.INCLUDE_LEFT | SortedIndex.INCLUDE_RIGHT;
+                    curPartIdx++;
+                } else {
+                    if (rangeConditionIterator == null) {
+                        rangeConditionIterator = rangeConditions.iterator();
+                    }
+
+                    RangeCondition<RowT> cond = rangeConditionIterator.next();
+
+                    lowerBound = toBinaryTuplePrefix(cond.lower());
+                    upperBound = toBinaryTuplePrefix(cond.upper());
+
+                    flags |= (cond.lowerInclude()) ? SortedIndex.INCLUDE_LEFT : 0;
+                    flags |= (cond.upperInclude()) ? SortedIndex.INCLUDE_RIGHT : 0;
+
+                    if (!rangeConditionIterator.hasNext()) { // Switch to next partition and reset range index.
+                        rangeConditionIterator = null;
+                        curPartIdx++;
+                    }
                 }
 
                 ((SortedIndex) schemaIndex.index()).scan(
-                        parts[curPartIdx++],
+                        parts[part],
                         context().transaction(),
                         lowerBound,
                         upperBound,
@@ -261,16 +275,33 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
                 ).subscribe(new SubscriberImpl());
             } else {
                 assert schemaIndex.type() == Type.HASH;
-                assert lowerCond == upperCond;
 
-                if (lowerBound == null) {
-                    lowerBound = toBinaryTuple(lowerCond);
+                int part = curPartIdx;
+                BinaryTuple key = null;
+
+                if (rangeConditions == null) {
+                    curPartIdx++;
+                } else {
+                    if (rangeConditionIterator == null) {
+                        rangeConditionIterator = rangeConditions.iterator();
+                    }
+
+                    RangeCondition<RowT> cond = rangeConditionIterator.next();
+
+                    assert cond.lower() == cond.upper();
+
+                    key = toBinaryTuple(cond.lower());
+
+                    if (!rangeConditionIterator.hasNext()) { // Switch to next partition and reset range index.
+                        rangeConditionIterator = null;
+                        curPartIdx++;
+                    }
                 }
 
                 schemaIndex.index().scan(
-                        parts[curPartIdx++],
+                        parts[part],
                         context().transaction(),
-                        lowerBound,
+                        key,
                         requiredColumns
                 ).subscribe(new SubscriberImpl());
             }
@@ -330,21 +361,21 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
     }
 
     @Contract("null -> null")
-    private @Nullable BinaryTuple toBinaryTuplePrefix(@Nullable Supplier<RowT> conditionSupplier) {
-        if (conditionSupplier == null) {
+    private @Nullable BinaryTuple toBinaryTuplePrefix(@Nullable RowT condition) {
+        if (condition == null) {
             return null;
         }
 
-        return RowConverter.toBinaryTuplePrefix(context(), indexRowSchema, factory, conditionSupplier.get());
+        return RowConverter.toBinaryTuplePrefix(context(), indexRowSchema, factory, condition);
     }
 
     @Contract("null -> null")
-    private @Nullable BinaryTuple toBinaryTuple(@Nullable Supplier<RowT> conditionSupplier) {
-        if (conditionSupplier == null) {
+    private @Nullable BinaryTuple toBinaryTuple(@Nullable RowT condition) {
+        if (condition == null) {
             return null;
         }
 
-        return RowConverter.toBinaryTuple(context(), indexRowSchema, factory, conditionSupplier.get());
+        return RowConverter.toBinaryTuple(context(), indexRowSchema, factory, condition);
     }
 
     private RowT convert(BinaryTuple binaryTuple) {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
index a2360f452d..a452324a54 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RuntimeHashIndex;
 import org.apache.ignite.internal.sql.engine.exec.RuntimeIndex;
 import org.apache.ignite.internal.sql.engine.exec.RuntimeSortedIndex;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -172,8 +173,7 @@ public class IndexSpoolNode<RowT> extends AbstractNode<RowT> implements SingleNo
             RelCollation collation,
             Comparator<RowT> comp,
             Predicate<RowT> filter,
-            Supplier<RowT> lowerIdxBound,
-            Supplier<RowT> upperIdxBound
+            RangeIterable<RowT> ranges
     ) {
         RuntimeSortedIndex<RowT> idx = new RuntimeSortedIndex<>(ctx, collation, comp);
 
@@ -184,8 +184,7 @@ public class IndexSpoolNode<RowT> extends AbstractNode<RowT> implements SingleNo
                         ctx,
                         rowType,
                         filter,
-                        lowerIdxBound,
-                        upperIdxBound
+                        ranges
                 )
         );
 
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelInputEx.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelInputEx.java
index f3dff244c1..a614438074 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelInputEx.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelInputEx.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.sql.engine.externalize;
 
+import java.util.List;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelInput;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 
 /**
  * Extension to the {@link RelInput} interface.
@@ -41,4 +43,12 @@ public interface RelInputEx extends RelInput {
      * @return A table with given id.
      */
     RelOptTable getTableById();
+
+    /**
+     * Returns search bounds.
+     *
+     * @param tag Tag.
+     * @return Search bounds.
+     */
+    List<SearchBounds> getSearchBounds(String tag);
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java
index fbe8f67a5c..c4e90df8d8 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java
@@ -104,6 +104,10 @@ import org.apache.calcite.sql.validate.SqlNameMatchers;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.ExactBounds;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.MultiBounds;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.RangeBounds;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.sql.engine.rel.InternalIgniteRel;
 import org.apache.ignite.internal.sql.engine.trait.DistributionFunction;
 import org.apache.ignite.internal.sql.engine.trait.DistributionTrait;
@@ -300,6 +304,8 @@ class RelJson {
             return toJson((RelDataTypeField) value);
         } else if (value instanceof ByteString) {
             return toJson((ByteString) value);
+        } else if (value instanceof SearchBounds) {
+            return toJson((SearchBounds) value);
         } else {
             throw new UnsupportedOperationException("type not serializable: "
                     + value + " (type " + value.getClass().getCanonicalName() + ")");
@@ -355,7 +361,7 @@ class RelJson {
             map.put("type", toJson(node.getSqlTypeName()));
             map.put("elementType", toJson(node.getComponentType()));
             return map;
-        } else  if (node.getSqlTypeName() == SqlTypeName.MAP) {
+        } else if (node.getSqlTypeName() == SqlTypeName.MAP) {
             Map<String, Object> map = map();
             map.put("type", toJson(node.getSqlTypeName()));
             map.put("keyType", toJson(node.getKeyType()));
@@ -569,6 +575,59 @@ class RelJson {
         return map;
     }
 
+    private Object toJson(SearchBounds val) {
+        Map map = map();
+        map.put("type", val.type().name());
+
+        if (val instanceof ExactBounds) {
+            map.put("bound", toJson(((ExactBounds) val).bound()));
+        } else if (val instanceof MultiBounds) {
+            map.put("bounds", toJson(((MultiBounds) val).bounds()));
+        } else {
+            assert val instanceof RangeBounds : val;
+
+            RangeBounds val0 = (RangeBounds) val;
+
+            map.put("lowerBound", val0.lowerBound() == null ? null : toJson(val0.lowerBound()));
+            map.put("upperBound", val0.upperBound() == null ? null : toJson(val0.upperBound()));
+            map.put("lowerInclude", val0.lowerInclude());
+            map.put("upperInclude", val0.upperInclude());
+        }
+
+        return map;
+    }
+
+    private SearchBounds toSearchBound(RelInput input, Map<String, Object> map) {
+        if (map == null) {
+            return null;
+        }
+
+        String type = (String) map.get("type");
+
+        if (SearchBounds.Type.EXACT.name().equals(type)) {
+            return new ExactBounds(null, toRex(input, map.get("bound")));
+        } else if (SearchBounds.Type.MULTI.name().equals(type)) {
+            return new MultiBounds(null, toSearchBoundList(input, (List<Map<String, Object>>) map.get("bounds")));
+        } else if (SearchBounds.Type.RANGE.name().equals(type)) {
+            return new RangeBounds(null,
+                    toRex(input, map.get("lowerBound")),
+                    toRex(input, map.get("upperBound")),
+                    (Boolean) map.get("lowerInclude"),
+                    (Boolean) map.get("upperInclude")
+            );
+        }
+
+        throw new IllegalStateException("Unsupported search bound type: " + type);
+    }
+
+    List<SearchBounds> toSearchBoundList(RelInput input, List<Map<String, Object>> bounds) {
+        if (bounds == null) {
+            return null;
+        }
+
+        return bounds.stream().map(b -> toSearchBound(input, b)).collect(Collectors.toList());
+    }
+
     RelCollation toCollation(List<Map<String, Object>> jsonFieldCollations) {
         if (jsonFieldCollations == null) {
             return RelCollations.EMPTY;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
index ea55c72dc3..645256b1a9 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
@@ -49,6 +49,7 @@ import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -331,6 +332,12 @@ public class RelJsonReader {
             return relJson.toCollation((List) get(tag));
         }
 
+        /** {@inheritDoc} */
+        @Override
+        public List<SearchBounds> getSearchBounds(String tag) {
+            return relJson.toSearchBoundList(this, (List<Map<String, Object>>) get(tag));
+        }
+
         /** {@inheritDoc} */
         @Override
         public RelDistribution getDistribution() {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdSelectivity.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdSelectivity.java
index bab765e7ef..fc1a9b3721 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdSelectivity.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdSelectivity.java
@@ -31,6 +31,9 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.BuiltInMethod;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.ExactBounds;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.RangeBounds;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.sql.engine.rel.AbstractIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSortedIndexSpool;
@@ -55,37 +58,32 @@ public class IgniteMdSelectivity extends RelMdSelectivity {
             return getSelectivity((ProjectableFilterableTableScan) rel, mq, predicate);
         }
 
-        List<RexNode> lowerCond = rel.lowerCondition();
-        List<RexNode> upperCond = rel.upperCondition();
+        List<SearchBounds> searchBounds = rel.searchBounds();
 
-        if (nullOrEmpty(lowerCond) && nullOrEmpty(upperCond)) {
+        if (nullOrEmpty(searchBounds)) {
             return RelMdUtil.guessSelectivity(rel.condition());
         }
 
         double idxSelectivity = 1.0;
-        int len = nullOrEmpty(lowerCond) ? upperCond.size() : nullOrEmpty(upperCond) ? lowerCond.size() :
-                Math.max(lowerCond.size(), upperCond.size());
 
-        for (int i = 0; i < len; i++) {
-            RexCall lower = nullOrEmpty(lowerCond) || lowerCond.size() <= i ? null : (RexCall) lowerCond.get(i);
-            RexCall upper = nullOrEmpty(upperCond) || upperCond.size() <= i ? null : (RexCall) upperCond.get(i);
-
-            assert lower != null || upper != null;
+        List<RexNode> conjunctions = RelOptUtil.conjunctions(rel.condition());
 
-            if (lower != null && upper != null) {
-                idxSelectivity *= lower.op.kind == SqlKind.EQUALS ? .1 : .2;
-            } else {
-                idxSelectivity *= .35;
+        for (SearchBounds bounds : searchBounds) {
+            if (bounds != null) {
+                conjunctions.remove(bounds.condition());
             }
-        }
 
-        List<RexNode> conjunctions = RelOptUtil.conjunctions(rel.condition());
+            if (bounds instanceof ExactBounds) {
+                idxSelectivity *= .1;
+            } else if (bounds instanceof RangeBounds) {
+                RangeBounds rangeBounds = (RangeBounds) bounds;
 
-        if (!nullOrEmpty(lowerCond)) {
-            conjunctions.removeAll(lowerCond);
-        }
-        if (!nullOrEmpty(upperCond)) {
-            conjunctions.removeAll(upperCond);
+                if (rangeBounds.condition() != null) {
+                    idxSelectivity *= ((RexCall) rangeBounds.condition()).op.kind == SqlKind.EQUALS ? .1 : .2;
+                } else {
+                    idxSelectivity *= .35;
+                }
+            }
         }
 
         RexNode remaining = RexUtil.composeConjunction(RexUtils.builder(rel), conjunctions, true);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePrograms.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePrograms.java
index 62c3d16907..c9346c4f11 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePrograms.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePrograms.java
@@ -54,6 +54,8 @@ public class IgnitePrograms {
             final HepPlanner hepPlanner = new HepPlanner(builder.build(), Commons.context(rel), true,
                     null, Commons.context(rel).config().getCostFactory());
 
+            hepPlanner.setExecutor(planner.getExecutor());
+
             for (RelOptMaterialization materialization : materializations) {
                 hepPlanner.addMaterialization(materialization);
             }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/bounds/ExactBounds.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/bounds/ExactBounds.java
new file mode 100644
index 0000000000..fa24740e93
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/bounds/ExactBounds.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.prepare.bounds;
+
+import java.util.Objects;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Exact (equals) bounds holder for search row.
+ */
+public class ExactBounds extends SearchBounds {
+    /** Search bound. */
+    private final RexNode bound;
+
+    public ExactBounds(RexNode condition, RexNode bound) {
+        super(condition);
+        this.bound = bound;
+    }
+
+    /**
+     * Returns search bound.
+     */
+    public RexNode bound() {
+        return bound;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Type type() {
+        return Type.EXACT;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        return bound.equals(((ExactBounds) o).bound);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int hashCode() {
+        return Objects.hash(bound);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return S.toString(ExactBounds.class, this);
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/bounds/MultiBounds.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/bounds/MultiBounds.java
new file mode 100644
index 0000000000..a7ace798ec
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/bounds/MultiBounds.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.prepare.bounds;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Multiple bounds holder for search row.
+ */
+public class MultiBounds extends SearchBounds {
+    @IgniteToStringInclude
+    private final List<SearchBounds> bounds;
+
+    public MultiBounds(RexNode condition, List<SearchBounds> bounds) {
+        super(condition);
+        this.bounds = bounds;
+    }
+
+    /**
+     * Returns search bounds.
+     */
+    public List<SearchBounds> bounds() {
+        return bounds;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Type type() {
+        return Type.MULTI;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        return bounds.equals(((MultiBounds) o).bounds);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int hashCode() {
+        return Objects.hash(bounds);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return S.toString(MultiBounds.class, this);
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/bounds/RangeBounds.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/bounds/RangeBounds.java
new file mode 100644
index 0000000000..258d96210d
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/bounds/RangeBounds.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.prepare.bounds;
+
+import java.util.Objects;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Range bounds holder for search row.
+ */
+public class RangeBounds extends SearchBounds {
+    /** Lower search bound. */
+    private final RexNode lowerBound;
+
+    /** Upper search bound. */
+    private final RexNode upperBound;
+
+    /** Inclusive lower bound flag. */
+    private final boolean lowerInclude;
+
+    /** Inclusive upper bound flag. */
+    private final boolean upperInclude;
+
+    /**
+     * Create range bounds.
+     *
+     * @param condition Condition.
+     * @param lowerBound Range lower bound.
+     * @param upperBound Range upper bound.
+     * @param lowerInclude Inclisive lower bound flag.
+     * @param upperInclude Inclusive upper bound flag.
+     */
+    public RangeBounds(
+            RexNode condition,
+            @Nullable RexNode lowerBound,
+            @Nullable RexNode upperBound,
+            boolean lowerInclude,
+            boolean upperInclude
+    ) {
+        super(condition);
+        this.lowerBound = lowerBound;
+        this.upperBound = upperBound;
+        this.lowerInclude = lowerInclude;
+        this.upperInclude = upperInclude;
+    }
+
+    /**
+     * Returns lower search bound.
+     */
+    public RexNode lowerBound() {
+        return lowerBound;
+    }
+
+    /**
+     * Returns upper search bound.
+     */
+    public RexNode upperBound() {
+        return upperBound;
+    }
+
+    /**
+     * Returns {@code True} if the lower bound is inclusive, {@code false} otherwise.
+     */
+    public boolean lowerInclude() {
+        return lowerInclude;
+    }
+
+    /**
+     * Returns {@code True} if the upper bound is inclusive, {@code false} otherwise.
+     */
+    public boolean upperInclude() {
+        return upperInclude;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Type type() {
+        return Type.RANGE;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        return lowerInclude == ((RangeBounds) o).lowerInclude
+                && upperInclude == ((RangeBounds) o).upperInclude
+                && Objects.equals(lowerBound, ((RangeBounds) o).lowerBound)
+                && Objects.equals(upperBound, ((RangeBounds) o).upperBound);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int hashCode() {
+        return Objects.hash(lowerBound, upperBound, lowerInclude, upperInclude);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return S.toString(RangeBounds.class, this);
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/bounds/SearchBounds.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/bounds/SearchBounds.java
new file mode 100644
index 0000000000..e7ef3303c3
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/bounds/SearchBounds.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.prepare.bounds;
+
+import org.apache.calcite.rex.RexNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Bounds holder for search row.
+ *
+ * <p>This class is describing bounds on per-column basis, and such choice was made to optimize serialization of a plan.
+ */
+public abstract class SearchBounds {
+    /** Condition required only for cost calculation, no serialization needed. */
+    private final RexNode condition;
+
+    /**
+     * Creates search bounds.
+     *
+     * @param condition Condition.
+     */
+    protected SearchBounds(@Nullable RexNode condition) {
+        this.condition = condition;
+    }
+
+    /**
+     * Condition.
+     */
+    public RexNode condition() {
+        return condition;
+    }
+
+    /**
+     * Returns bounds type.
+     */
+    public abstract Type type();
+
+    /**
+     * Search bounds type.
+     */
+    public enum Type {
+        /** Exact search value. */
+        EXACT,
+
+        /** Range of values. */
+        RANGE,
+
+        /** Multiple values or multiple ranges. */
+        MULTI
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIndexScan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIndexScan.java
index f12db08dbb..e1be677d5e 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIndexScan.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIndexScan.java
@@ -31,10 +31,11 @@ import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.sql.engine.externalize.RelInputEx;
 import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
-import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
-import org.apache.ignite.internal.sql.engine.util.IndexConditions;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -43,7 +44,7 @@ import org.jetbrains.annotations.Nullable;
 public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
     protected final String idxName;
 
-    protected final IndexConditions idxCond;
+    protected final List<SearchBounds> searchBounds;
 
     protected final IgniteIndex.Type type;
 
@@ -56,7 +57,7 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
         super(input);
         idxName = input.getString("index");
         type = input.getEnum("type", IgniteIndex.Type.class);
-        idxCond = new IndexConditions(input);
+        searchBounds = ((RelInputEx) input).getSearchBounds("searchBounds");
     }
 
     /**
@@ -72,14 +73,14 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
             IgniteIndex.Type type,
             @Nullable List<RexNode> proj,
             @Nullable RexNode cond,
-            @Nullable IndexConditions idxCond,
+            @Nullable List<SearchBounds> searchBounds,
             @Nullable ImmutableBitSet reqColumns
     ) {
         super(cluster, traitSet, hints, table, proj, cond, reqColumns);
 
         this.idxName = idxName;
         this.type = type;
-        this.idxCond = idxCond;
+        this.searchBounds = searchBounds;
     }
 
     /** {@inheritDoc} */
@@ -88,8 +89,9 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
         pw = pw.item("index", idxName);
         pw = pw.item("type", type.name());
         pw = super.explainTerms0(pw);
+        pw = pw.itemIf("searchBounds", searchBounds, searchBounds != null);
 
-        return idxCond.explainTerms(pw);
+        return pw;
     }
 
     /**
@@ -100,71 +102,27 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
         return idxName;
     }
 
-    /**
-     * Get lower index condition.
-     */
-    public List<RexNode> lowerCondition() {
-        return idxCond == null ? null : idxCond.lowerCondition();
-    }
-
-    /**
-     * Get lower index condition.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
-     */
-    public List<RexNode> lowerBound() {
-        return idxCond == null ? null : idxCond.lowerBound();
-    }
-
-    /**
-     * Get upper index condition.
-     */
-    public List<RexNode> upperCondition() {
-        return idxCond == null ? null : idxCond.upperCondition();
-    }
-
-    /**
-     * Get upper index condition.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
-     */
-    public List<RexNode> upperBound() {
-        return idxCond == null ? null : idxCond.upperBound();
-    }
-
     /** {@inheritDoc} */
     @Override
     public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
         double rows = table.getRowCount();
 
-        double cost = rows * IgniteCost.ROW_PASS_THROUGH_COST;
+        double cost;
 
-        if (condition != null) {
+        if (condition == null) {
+            cost = rows * IgniteCost.ROW_PASS_THROUGH_COST;
+        } else {
             RexBuilder builder = getCluster().getRexBuilder();
 
             double selectivity = 1;
 
             cost = 0;
 
-            // for hash index both bounds are set to the same search row
-            // because only lookup is possible. So if either bound is null,
-            // then there will be a full index scan.
-            if (type == Type.HASH && lowerBound() != null) {
-                cost += IgniteCost.HASH_LOOKUP_COST;
-
-                selectivity -= 1 - mq.getSelectivity(this, RexUtil.composeConjunction(builder, List.of(condition)));
-            } else if (type == Type.SORTED) {
-                if (lowerCondition() != null) {
-                    double selectivity0 = mq.getSelectivity(this, RexUtil.composeConjunction(builder, lowerCondition()));
-
-                    selectivity -= 1 - selectivity0;
-
-                    cost += Math.log(rows);
-                }
-
-                if (upperCondition() != null && lowerCondition() != null && !lowerCondition().equals(upperCondition())) {
-                    double selectivity0 = mq.getSelectivity(this, RexUtil.composeConjunction(builder, upperCondition()));
+            if (searchBounds != null) {
+                selectivity = mq.getSelectivity(this, RexUtil.composeConjunction(builder,
+                        Commons.transform(searchBounds, b -> b == null ? null : b.condition())));
 
-                    selectivity -= 1 - selectivity0;
-                }
+                cost = Math.log(rows) * IgniteCost.ROW_COMPARISON_COST;
             }
 
             rows *= selectivity;
@@ -184,7 +142,7 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
      * Get index conditions.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
-    public IndexConditions indexConditions() {
-        return idxCond;
+    public List<SearchBounds> searchBounds() {
+        return searchBounds;
     }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteIndexScan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteIndexScan.java
index 7fdae8a055..0236c8233c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteIndexScan.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteIndexScan.java
@@ -27,8 +27,8 @@ import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
-import org.apache.ignite.internal.sql.engine.util.IndexConditions;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -63,6 +63,7 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
      * @param type Type of the index.
      * @param proj Projects.
      * @param cond Filters.
+     * @param searchBounds Index search conditions.
      * @param requiredCols Participating columns.
      */
     public IgniteIndexScan(
@@ -73,10 +74,10 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
             IgniteIndex.Type type,
             @Nullable List<RexNode> proj,
             @Nullable RexNode cond,
-            @Nullable IndexConditions idxCond,
+            @Nullable List<SearchBounds> searchBounds,
             @Nullable ImmutableBitSet requiredCols
     ) {
-        this(-1L, cluster, traits, tbl, idxName, type, proj, cond, idxCond, requiredCols);
+        this(-1L, cluster, traits, tbl, idxName, type, proj, cond, searchBounds, requiredCols);
     }
 
     /**
@@ -88,6 +89,7 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
      * @param idxName      Index name.
      * @param proj         Projects.
      * @param cond         Filters.
+     * @param searchBounds Index search conditions.
      * @param requiredCols Participating columns.
      */
     private IgniteIndexScan(
@@ -99,10 +101,10 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
             IgniteIndex.Type type,
             @Nullable List<RexNode> proj,
             @Nullable RexNode cond,
-            @Nullable IndexConditions idxCond,
+            @Nullable List<SearchBounds> searchBounds,
             @Nullable ImmutableBitSet requiredCols
     ) {
-        super(cluster, traits, List.of(), tbl, idxName, type, proj, cond, idxCond, requiredCols);
+        super(cluster, traits, List.of(), tbl, idxName, type, proj, cond, searchBounds, requiredCols);
 
         this.sourceId = sourceId;
     }
@@ -130,13 +132,13 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
     @Override
     public IgniteRel clone(long sourceId) {
         return new IgniteIndexScan(sourceId, getCluster(), getTraitSet(), getTable(),
-                idxName, type, projects, condition, idxCond, requiredColumns);
+                idxName, type, projects, condition, searchBounds, requiredColumns);
     }
 
     /** {@inheritDoc} */
     @Override
     public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
         return new IgniteIndexScan(sourceId, cluster, getTraitSet(), getTable(),
-                idxName, type, projects, condition, idxCond, requiredColumns);
+                idxName, type, projects, condition, searchBounds, requiredColumns);
     }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteSortedIndexSpool.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteSortedIndexSpool.java
index 6dd8b78f45..2452d05bab 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteSortedIndexSpool.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteSortedIndexSpool.java
@@ -30,9 +30,10 @@ import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Spool;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.sql.engine.externalize.RelInputEx;
 import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
-import org.apache.ignite.internal.sql.engine.util.IndexConditions;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 
 /**
  * Relational operator that returns the sorted contents of a table and allow to lookup rows by specified bounds.
@@ -40,8 +41,8 @@ import org.apache.ignite.internal.sql.engine.util.IndexConditions;
 public class IgniteSortedIndexSpool extends AbstractIgniteSpool implements InternalIgniteRel {
     private final RelCollation collation;
 
-    /** Index condition. */
-    private final IndexConditions idxCond;
+    /** Index search conditions. */
+    private final List<SearchBounds> searchBounds;
 
     /** Filters. */
     protected final RexNode condition;
@@ -56,14 +57,14 @@ public class IgniteSortedIndexSpool extends AbstractIgniteSpool implements Inter
             RelNode input,
             RelCollation collation,
             RexNode condition,
-            IndexConditions idxCond
+            List<SearchBounds> searchBounds
     ) {
         super(cluster, traits, Type.LAZY, input);
 
-        assert Objects.nonNull(idxCond);
+        assert Objects.nonNull(searchBounds);
         assert Objects.nonNull(condition);
 
-        this.idxCond = idxCond;
+        this.searchBounds = searchBounds;
         this.condition = condition;
         this.collation = collation;
     }
@@ -79,7 +80,7 @@ public class IgniteSortedIndexSpool extends AbstractIgniteSpool implements Inter
                 input.getInputs().get(0),
                 input.getCollation(),
                 input.getExpression("condition"),
-                new IndexConditions(input)
+                ((RelInputEx) input).getSearchBounds("searchBounds")
         );
     }
 
@@ -95,13 +96,13 @@ public class IgniteSortedIndexSpool extends AbstractIgniteSpool implements Inter
      */
     @Override
     public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteSortedIndexSpool(cluster, getTraitSet(), inputs.get(0), collation, condition, idxCond);
+        return new IgniteSortedIndexSpool(cluster, getTraitSet(), inputs.get(0), collation, condition, searchBounds);
     }
 
     /** {@inheritDoc} */
     @Override
     protected Spool copy(RelTraitSet traitSet, RelNode input, Type readType, Type writeType) {
-        return new IgniteSortedIndexSpool(getCluster(), traitSet, input, collation, condition, idxCond);
+        return new IgniteSortedIndexSpool(getCluster(), traitSet, input, collation, condition, searchBounds);
     }
 
     /** {@inheritDoc} */
@@ -120,8 +121,9 @@ public class IgniteSortedIndexSpool extends AbstractIgniteSpool implements Inter
 
         writer.item("condition", condition);
         writer.item("collation", collation);
+        writer.itemIf("searchBounds", searchBounds, searchBounds != null);
 
-        return idxCond.explainTerms(writer);
+        return writer;
     }
 
     /** {@inheritDoc} */
@@ -131,10 +133,10 @@ public class IgniteSortedIndexSpool extends AbstractIgniteSpool implements Inter
     }
 
     /**
-     * Get index condition.
+     * Get index search conditions.
      */
-    public IndexConditions indexCondition() {
-        return idxCond;
+    public List<SearchBounds> searchBounds() {
+        return searchBounds;
     }
 
     /**
@@ -160,7 +162,7 @@ public class IgniteSortedIndexSpool extends AbstractIgniteSpool implements Inter
         double totalBytes = rowCnt * bytesPerRow;
         double cpuCost;
 
-        if (idxCond.lowerCondition() != null) {
+        if (searchBounds != null) {
             cpuCost = Math.log(rowCnt) * IgniteCost.ROW_COMPARISON_COST;
         } else {
             cpuCost = rowCnt * IgniteCost.ROW_PASS_THROUGH_COST;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/ProjectableFilterableTableScan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/ProjectableFilterableTableScan.java
index af60973f0f..1d9ec34033 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/ProjectableFilterableTableScan.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/ProjectableFilterableTableScan.java
@@ -142,9 +142,12 @@ public abstract class ProjectableFilterableTableScan extends TableScan {
     }
 
     protected RelWriter explainTerms0(RelWriter pw) {
-        return pw
-                .itemIf("filters", condition, condition != null)
-                .itemIf("projects", projects, projects != null)
+        if (condition != null) {
+            pw.item("filters", pw.nest() ? condition :
+                    RexUtil.expandSearch(getCluster().getRexBuilder(), null, condition));
+        }
+
+        return pw.itemIf("projects", projects, projects != null)
                 .itemIf("requiredColumns", requiredColumns, requiredColumns != null);
     }
 
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/logical/IgniteLogicalIndexScan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/logical/IgniteLogicalIndexScan.java
index 9ef55fe0a4..34f3226577 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/logical/IgniteLogicalIndexScan.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/logical/IgniteLogicalIndexScan.java
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.mapping.Mappings;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.sql.engine.rel.AbstractIndexScan;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
@@ -32,7 +33,6 @@ import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
-import org.apache.ignite.internal.sql.engine.util.IndexConditions;
 import org.apache.ignite.internal.sql.engine.util.RexUtils;
 import org.jetbrains.annotations.Nullable;
 
@@ -62,11 +62,11 @@ public class IgniteLogicalIndexScan extends AbstractIndexScan {
             collation = collation.apply(targetMapping);
         }
 
-        IndexConditions idxCond;
+        List<SearchBounds> searchBounds;
         if (index.type() == Type.HASH) {
-            idxCond = buildHashIndexConditions(cluster, tbl, index.columns(), cond, requiredColumns);
+            searchBounds = buildHashIndexConditions(cluster, tbl, index.columns(), cond, requiredColumns);
         } else if (index.type() == Type.SORTED) {
-            idxCond = buildSortedIndexConditions(cluster, tbl, collation, cond, requiredColumns);
+            searchBounds = buildSortedIndexConditions(cluster, tbl, collation, cond, requiredColumns);
         } else {
             throw new AssertionError("Unknown index type [type=" + index.type() + "]");
         }
@@ -79,7 +79,7 @@ public class IgniteLogicalIndexScan extends AbstractIndexScan {
                 index.type(),
                 proj,
                 cond,
-                idxCond,
+                searchBounds,
                 requiredColumns);
     }
 
@@ -93,7 +93,7 @@ public class IgniteLogicalIndexScan extends AbstractIndexScan {
      * @param type Type of the index.
      * @param proj Projects.
      * @param cond Filters.
-     * @param idxCond Index conditions.
+     * @param searchBounds Index search conditions.
      * @param requiredCols Participating columns.
      */
     private IgniteLogicalIndexScan(
@@ -104,13 +104,13 @@ public class IgniteLogicalIndexScan extends AbstractIndexScan {
             IgniteIndex.Type type,
             @Nullable List<RexNode> proj,
             @Nullable RexNode cond,
-            @Nullable IndexConditions idxCond,
+            @Nullable List<SearchBounds> searchBounds,
             @Nullable ImmutableBitSet requiredCols
     ) {
-        super(cluster, traits, List.of(), tbl, idxName, type, proj, cond, idxCond, requiredCols);
+        super(cluster, traits, List.of(), tbl, idxName, type, proj, cond, searchBounds, requiredCols);
     }
 
-    private static IndexConditions buildSortedIndexConditions(
+    private static List<SearchBounds> buildSortedIndexConditions(
             RelOptCluster cluster,
             InternalIgniteTable table,
             RelCollation collation,
@@ -118,7 +118,7 @@ public class IgniteLogicalIndexScan extends AbstractIndexScan {
             @Nullable ImmutableBitSet requiredColumns
     ) {
         if (collation.getFieldCollations().isEmpty()) {
-            return new IndexConditions();
+            return List.of();
         }
 
         return RexUtils.buildSortedIndexConditions(
@@ -130,7 +130,7 @@ public class IgniteLogicalIndexScan extends AbstractIndexScan {
         );
     }
 
-    private static IndexConditions buildHashIndexConditions(
+    private static List<SearchBounds> buildHashIndexConditions(
             RelOptCluster cluster,
             InternalIgniteTable table,
             List<String> indexedColumns,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/FilterSpoolMergeToSortedIndexSpoolRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/FilterSpoolMergeToSortedIndexSpoolRule.java
index 28271a7cfd..2d1467f5f6 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/FilterSpoolMergeToSortedIndexSpoolRule.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/FilterSpoolMergeToSortedIndexSpoolRule.java
@@ -19,11 +19,13 @@ package org.apache.ignite.internal.sql.engine.rule;
 
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
-import java.util.ArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
 import java.util.List;
-import java.util.Objects;
-import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
@@ -35,13 +37,12 @@ import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Spool;
-import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSortedIndexSpool;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableSpool;
 import org.apache.ignite.internal.sql.engine.trait.CorrelationTrait;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
-import org.apache.ignite.internal.sql.engine.util.IndexConditions;
 import org.apache.ignite.internal.sql.engine.util.RexUtils;
 import org.immutables.value.Value;
 
@@ -76,7 +77,7 @@ public class FilterSpoolMergeToSortedIndexSpoolRule extends RelRule<FilterSpoolM
 
         RelCollation inCollation = TraitUtils.collation(input);
 
-        IndexConditions idxCond = RexUtils.buildSortedIndexConditions(
+        List<SearchBounds> searchBounds = RexUtils.buildSortedIndexConditions(
                 cluster,
                 inCollation,
                 filter.getCondition(),
@@ -84,7 +85,7 @@ public class FilterSpoolMergeToSortedIndexSpoolRule extends RelRule<FilterSpoolM
                 null
         );
 
-        if (nullOrEmpty(idxCond.lowerCondition()) && nullOrEmpty(idxCond.upperCondition())) {
+        if (nullOrEmpty(searchBounds)) {
             return;
         }
 
@@ -92,24 +93,14 @@ public class FilterSpoolMergeToSortedIndexSpoolRule extends RelRule<FilterSpoolM
         RelCollation searchCollation;
 
         if (inCollation == null || inCollation.isDefault()) {
-            // Create collation by index condition.
-            List<RexNode> lowerBound = idxCond.lowerBound();
-            List<RexNode> upperBound = idxCond.upperBound();
-
-            assert lowerBound == null || upperBound == null || lowerBound.size() == upperBound.size();
-
-            int cardinality = lowerBound != null ? lowerBound.size() : upperBound.size();
-
-            List<Integer> equalsFields = new ArrayList<>(cardinality);
-            List<Integer> otherFields = new ArrayList<>(cardinality);
+            // Create collation by index bounds.
+            IntList equalsFields = new IntArrayList(searchBounds.size());
+            IntList otherFields = new IntArrayList(searchBounds.size());
 
             // First, add all equality filters to collation, then add other fields.
-            for (int i = 0; i < cardinality; i++) {
-                RexNode lowerNode = lowerBound != null ? lowerBound.get(i) : null;
-                RexNode upperNode = upperBound != null ? upperBound.get(i) : null;
-
-                if (RexUtils.isNotNull(lowerNode) || RexUtils.isNotNull(upperNode)) {
-                    (Objects.equals(lowerNode, upperNode) ? equalsFields : otherFields).add(i);
+            for (int i = 0; i < searchBounds.size(); i++) {
+                if (searchBounds.get(i) != null) {
+                    (searchBounds.get(i).type() == SearchBounds.Type.EXACT ? equalsFields : otherFields).add(i);
                 }
             }
 
@@ -120,7 +111,12 @@ public class FilterSpoolMergeToSortedIndexSpoolRule extends RelRule<FilterSpoolM
             // Create search collation as a prefix of input collation.
             traitCollation = inCollation;
 
-            Set<Integer> searchKeys = idxCond.keys();
+            IntSet searchKeys = new IntOpenHashSet();
+
+            Ord.zip(searchBounds).stream()
+                    .filter(v -> v.e != null)
+                    .mapToInt(v -> v.i)
+                    .forEach(searchKeys::add);
 
             List<RelFieldCollation> collationFields = inCollation.getFieldCollations().subList(0, searchKeys.size());
 
@@ -137,7 +133,7 @@ public class FilterSpoolMergeToSortedIndexSpoolRule extends RelRule<FilterSpoolM
                 convert(input, input.getTraitSet().replace(traitCollation)),
                 searchCollation,
                 filter.getCondition(),
-                idxCond
+                searchBounds
         );
 
         call.transformTo(res);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
index 98bb3fb39d..a24b277ce5 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
@@ -105,7 +105,7 @@ public abstract class LogicalScanConverterRule<T extends ProjectableFilterableTa
                         index.type(),
                         rel.projects(),
                         rel.condition(),
-                        rel.indexConditions(),
+                        rel.searchBounds(),
                         rel.requiredColumns()
                     );
                 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/logical/FilterScanMergeRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/logical/FilterScanMergeRule.java
index db2b6fc523..8806029894 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/logical/FilterScanMergeRule.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/logical/FilterScanMergeRule.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.rule.logical;
 
 import java.util.Arrays;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPredicateList;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelRule;
@@ -29,6 +30,7 @@ import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSimplify;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.ignite.internal.sql.engine.rel.ProjectableFilterableTableScan;
 import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalIndexScan;
@@ -84,9 +86,13 @@ public abstract class FilterScanMergeRule<T extends ProjectableFilterableTableSc
             condition = RexUtil.composeConjunction(builder, Arrays.asList(scan.condition(), condition));
         }
 
+        // We need to replace RexLocalRef with RexInputRef because "simplify" doesn't understand local refs.
+        condition = RexUtils.replaceLocalRefs(condition);
+        condition = new RexSimplify(builder, RelOptPredicateList.EMPTY, call.getPlanner().getExecutor())
+                .simplifyUnknownAsFalse(condition);
+
         // We need to replace RexInputRef with RexLocalRef because TableScan doesn't have inputs.
-        // TODO SEARCH support
-        condition = RexUtils.replaceInputRefs(RexUtil.expandSearch(builder, null, condition));
+        condition = RexUtils.replaceInputRefs(condition);
 
         // Set default traits, real traits will be calculated for physical node.
         RelTraitSet trait = cluster.traitSet();
@@ -104,7 +110,8 @@ public abstract class FilterScanMergeRule<T extends ProjectableFilterableTableSc
         }
 
         /** {@inheritDoc} */
-        @Override protected IgniteLogicalIndexScan createNode(
+        @Override
+        protected IgniteLogicalIndexScan createNode(
                 RelOptCluster cluster,
                 IgniteLogicalIndexScan scan,
                 RelTraitSet traits,
@@ -121,7 +128,8 @@ public abstract class FilterScanMergeRule<T extends ProjectableFilterableTableSc
         }
 
         /** {@inheritDoc} */
-        @Override protected IgniteLogicalTableScan createNode(
+        @Override
+        protected IgniteLogicalTableScan createNode(
                 RelOptCluster cluster,
                 IgniteLogicalTableScan scan,
                 RelTraitSet traits,
@@ -164,7 +172,7 @@ public abstract class FilterScanMergeRule<T extends ProjectableFilterableTableSc
                     .withOperandSupplier(b -> b.operand(LogicalFilter.class)
                             .predicate(p -> !skipCorrelated || !RexUtils.hasCorrelation(p.getCondition()))
                             .oneInput(b1 -> b1.operand(scanCls).noInputs()))
-                .as(Config.class);
+                    .as(Config.class);
         }
     }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IndexConditions.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IndexConditions.java
deleted file mode 100644
index ee26d604f3..0000000000
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IndexConditions.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.util;
-
-import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
-
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-import it.unimi.dsi.fastutil.ints.IntSet;
-import it.unimi.dsi.fastutil.ints.IntSets;
-import java.util.List;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rex.RexNode;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Index conditions and bounds holder. Conditions are not printed to terms (serialized). They are used only to calculate selectivity.
- */
-public class IndexConditions {
-    private final List<RexNode> lowerCond;
-
-    private final List<RexNode> upperCond;
-
-    private final List<RexNode> lowerBound;
-
-    private final List<RexNode> upperBound;
-
-    /**
-     * Constructor.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
-     */
-    public IndexConditions() {
-        this(null, null, null, null);
-    }
-
-    /**
-     * Constructor.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
-     */
-    public IndexConditions(
-            @Nullable List<RexNode> lowerCond,
-            @Nullable List<RexNode> upperCond,
-            @Nullable List<RexNode> lowerBound,
-            @Nullable List<RexNode> upperBound
-    ) {
-        this.lowerCond = lowerCond;
-        this.upperCond = upperCond;
-        this.lowerBound = lowerBound;
-        this.upperBound = upperBound;
-    }
-
-    /**
-     * Constructor.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
-     */
-    public IndexConditions(RelInput input) {
-        lowerCond = null;
-        upperCond = null;
-        lowerBound = input.get("lower") == null ? null : input.getExpressionList("lower");
-        upperBound = input.get("upper") == null ? null : input.getExpressionList("upper");
-    }
-
-    /**
-     * Get lower index condition.
-     */
-    public List<RexNode> lowerCondition() {
-        return lowerCond;
-    }
-
-    /**
-     * Get upper index condition.
-     */
-    public List<RexNode> upperCondition() {
-        return upperCond;
-    }
-
-    /**
-     * Get lower index bounds (a row with values at the index columns).
-     */
-    public List<RexNode> lowerBound() {
-        return lowerBound;
-    }
-
-    /**
-     * Get upper index bounds (a row with values at the index columns).
-     */
-    public List<RexNode> upperBound() {
-        return upperBound;
-    }
-
-    /**
-     * Keys.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
-     */
-    public IntSet keys() {
-        if (upperBound == null && lowerBound == null) {
-            return IntSets.EMPTY_SET;
-        }
-
-        IntSet keys = new IntOpenHashSet();
-
-        int cols = lowerBound != null ? lowerBound.size() : upperBound.size();
-
-        for (int i = 0; i < cols; ++i) {
-            if (upperBound != null && RexUtils.isNotNull(upperBound.get(i))
-                    || lowerBound != null && RexUtils.isNotNull(lowerBound.get(i))) {
-                keys.add(i);
-            }
-        }
-
-        return IntSets.unmodifiable(keys);
-    }
-
-    /**
-     * Describes index bounds.
-     *
-     * @param pw Plan writer.
-     * @return Plan writer for fluent-explain pattern.
-     */
-    public RelWriter explainTerms(RelWriter pw) {
-        return pw
-                .itemIf("lower", lowerBound, !nullOrEmpty(lowerBound))
-                .itemIf("upper", upperBound, !nullOrEmpty(upperBound));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public String toString() {
-        return "IndexConditions{"
-                + "lowerCond=" + lowerCond
-                + ", upperCond=" + upperCond
-                + ", lowerBound=" + lowerBound
-                + ", upperBound=" + upperBound
-                + '}';
-    }
-}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
index f5ccb2c5a7..bad5f0abbf 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
@@ -17,18 +17,25 @@
 
 package org.apache.ignite.internal.sql.engine.util;
 
+import static org.apache.calcite.rex.RexUtil.removeCast;
+import static org.apache.calcite.rex.RexUtil.sargRef;
 import static org.apache.calcite.sql.SqlKind.EQUALS;
 import static org.apache.calcite.sql.SqlKind.GREATER_THAN;
 import static org.apache.calcite.sql.SqlKind.GREATER_THAN_OR_EQUAL;
+import static org.apache.calcite.sql.SqlKind.IS_NOT_NULL;
+import static org.apache.calcite.sql.SqlKind.IS_NULL;
 import static org.apache.calcite.sql.SqlKind.LESS_THAN;
 import static org.apache.calcite.sql.SqlKind.LESS_THAN_OR_EQUAL;
+import static org.apache.calcite.sql.SqlKind.SEARCH;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
-import it.unimi.dsi.fastutil.objects.ObjectIterator;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -59,6 +66,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexShuttle;
 import org.apache.calcite.rex.RexSimplify;
 import org.apache.calcite.rex.RexSlot;
+import org.apache.calcite.rex.RexUnknownAs;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.rex.RexVisitor;
 import org.apache.calcite.rex.RexVisitorImpl;
@@ -69,9 +77,14 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.ControlFlowException;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Litmus;
+import org.apache.calcite.util.Sarg;
 import org.apache.calcite.util.Util;
 import org.apache.calcite.util.mapping.MappingType;
 import org.apache.calcite.util.mapping.Mappings;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.ExactBounds;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.MultiBounds;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.RangeBounds;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.jetbrains.annotations.Nullable;
@@ -81,6 +94,9 @@ import org.jetbrains.annotations.Nullable;
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
 public class RexUtils {
+    /** Maximum amount of search bounds tuples per scan. */
+    public static final int MAX_SEARCH_BOUNDS_COMPLEXITY = 100;
+
     /**
      * MakeCast.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -179,17 +195,24 @@ public class RexUtils {
         return true;
     }
 
+    /** Binary comparison operations. */
+    private static final Set<SqlKind> BINARY_COMPARISON =
+            EnumSet.of(EQUALS, LESS_THAN, GREATER_THAN, GREATER_THAN_OR_EQUAL, LESS_THAN_OR_EQUAL);
+
     /** Supported index operations. */
     private static final Set<SqlKind> TREE_INDEX_COMPARISON =
             EnumSet.of(
+                    SEARCH,
+                    IS_NULL,
+                    IS_NOT_NULL,
                     EQUALS,
                     LESS_THAN, GREATER_THAN,
                     GREATER_THAN_OR_EQUAL, LESS_THAN_OR_EQUAL);
 
     /**
-     * Builds index conditions.
+     * Builds sorted index search bounds.
      */
-    public static IndexConditions buildSortedIndexConditions(
+    public static List<SearchBounds> buildSortedIndexConditions(
             RelOptCluster cluster,
             RelCollation collation,
             RexNode condition,
@@ -197,37 +220,46 @@ public class RexUtils {
             ImmutableBitSet requiredColumns
     ) {
         if (condition == null) {
-            return new IndexConditions();
+            return List.of();
         }
 
         condition = RexUtil.toCnf(builder(cluster), condition);
 
-        Int2ObjectOpenHashMap<List<RexCall>> fieldsToPredicates = mapPredicatesToFields(condition, cluster);
+        Int2ObjectMap<List<RexCall>> fieldsToPredicates = mapPredicatesToFields(condition, cluster);
 
         if (nullOrEmpty(fieldsToPredicates)) {
-            return new IndexConditions();
+            return List.of();
         }
 
-        List<RexNode> lower = new ArrayList<>();
-        List<RexNode> upper = new ArrayList<>();
-
         // Force collation for all fields of the condition.
         if (collation == null || collation.isDefault()) {
-            List<Integer> equalsFields = new ArrayList<>(fieldsToPredicates.size());
-            List<Integer> otherFields = new ArrayList<>(fieldsToPredicates.size());
+            IntArrayList fields = new IntArrayList(fieldsToPredicates.size());
+            IntArrayList lastFields = new IntArrayList(fieldsToPredicates.size());
 
             // It's more effective to put equality conditions in the collation first.
-            fieldsToPredicates.forEach((idx, conds) ->
-                    (conds.stream().allMatch(call -> call.getOperator().getKind() == EQUALS) ? equalsFields : otherFields).add(idx));
+            fieldsToPredicates.int2ObjectEntrySet()
+                    .forEach(entry -> {
+                        (entry.getValue().stream().anyMatch(v -> v.getOperator().getKind() == EQUALS) ? fields : lastFields)
+                                .add(entry.getIntKey());
+                    });
+            fields.addAll(lastFields);
+
+            collation = TraitUtils.createCollation(fields);
+        }
 
-            equalsFields.addAll(otherFields);
+        List<RelDataType> types = RelOptUtil.getFieldTypeList(rowType);
+
+        Mappings.TargetMapping mapping = null;
 
-            collation = TraitUtils.createCollation(equalsFields);
+        if (requiredColumns != null) {
+            mapping = Commons.inverseMapping(requiredColumns, types.size());
         }
 
-        for (int i = 0; i < collation.getFieldCollations().size(); i++) {
-            RelFieldCollation fc = collation.getFieldCollations().get(i);
+        List<SearchBounds> bounds = Arrays.asList(new SearchBounds[types.size()]);
+        boolean boundsEmpty = true;
+        int prevComplexity = 1;
 
+        for (RelFieldCollation fc : collation.getFieldCollations()) {
             int collFldIdx = fc.getFieldIndex();
 
             List<RexCall> collFldPreds = fieldsToPredicates.get(collFldIdx);
@@ -236,102 +268,41 @@ public class RexUtils {
                 break;
             }
 
-            RexNode bestUpper = null;
-            RexNode bestLower = null;
-
-            for (RexCall pred : collFldPreds) {
-                if (IgniteUtils.assertionsEnabled()) {
-                    RexNode cond = RexUtil.removeCast(pred.operands.get(1));
-
-                    assert idxOpSupports(cond) : cond;
-                }
-
-                boolean lowerBoundBelow = !fc.getDirection().isDescending();
-                SqlOperator op = pred.getOperator();
-                switch (op.kind) {
-                    case EQUALS:
-                        bestUpper = pred;
-                        bestLower = pred;
-                        break;
-
-                    case LESS_THAN:
-                    case LESS_THAN_OR_EQUAL:
-                        lowerBoundBelow = !lowerBoundBelow;
-                        if (lowerBoundBelow) {
-                            bestLower = pred;
-                        } else {
-                            bestUpper = pred;
-                        }
-                        break;
-
-                    case GREATER_THAN:
-                    case GREATER_THAN_OR_EQUAL:
-                        if (lowerBoundBelow) {
-                            bestLower = pred;
-                        } else {
-                            bestUpper = pred;
-                        }
-                        break;
+            if (mapping != null) {
+                collFldIdx = mapping.getSourceOpt(collFldIdx);
+            }
 
-                    default:
-                        throw new AssertionError("Unknown condition: " + op.kind);
-                }
+            SearchBounds fldBounds = createBounds(fc, collFldPreds, cluster, types.get(collFldIdx), prevComplexity);
 
-                if (bestUpper != null && bestLower != null) {
-                    break; // We've found either "=" condition or both lower and upper.
-                }
+            if (fldBounds == null) {
+                break;
             }
 
-            if (bestLower == null && bestUpper == null) {
-                break; // No bounds, so break the loop.
-            }
+            boundsEmpty = false;
 
-            if (bestLower != null && bestUpper != null) { // "x>5 AND x<10"
-                upper.add(bestUpper);
-                lower.add(bestLower);
+            bounds.set(collFldIdx, fldBounds);
 
-                if (bestLower != bestUpper) {
+            if (fldBounds instanceof MultiBounds) {
+                prevComplexity *= ((MultiBounds) fldBounds).bounds().size();
+
+                // Any bounds after multi range bounds are not allowed, since it can cause intervals intersection.
+                if (((MultiBounds) fldBounds).bounds().stream().anyMatch(b -> b.type() != SearchBounds.Type.EXACT)) {
                     break;
                 }
-            } else if (bestLower != null) { // "x>5"
-                lower.add(bestLower);
-
-                break; // TODO https://issues.apache.org/jira/browse/IGNITE-13568
-            } else { // "x<10"
-                upper.add(bestUpper);
+            }
 
+            if (fldBounds.type() == SearchBounds.Type.RANGE) {
                 break; // TODO https://issues.apache.org/jira/browse/IGNITE-13568
             }
         }
 
-        Mappings.TargetMapping mapping = null;
-
-        if (requiredColumns != null) {
-            mapping = Commons.inverseMapping(requiredColumns, rowType.getFieldCount());
-        }
-
-        List<RexNode> lowerBound = null;
-        List<RexNode> upperBound = null;
-
-        if (!nullOrEmpty(lower)) {
-            lowerBound = asBound(cluster, lower, rowType, mapping);
-        } else {
-            lower = null;
-        }
-
-        if (!nullOrEmpty(upper)) {
-            upperBound = asBound(cluster, upper, rowType, mapping);
-        } else {
-            upper = null;
-        }
-
-        return new IndexConditions(lower, upper, lowerBound, upperBound);
+        return boundsEmpty ? List.of() : bounds;
     }
 
     /**
-     * Builds index conditions.
+     * Builds hash index search bounds.
      */
-    public static IndexConditions buildHashIndexConditions(
+    public static List<SearchBounds> buildHashIndexConditions(
             RelOptCluster cluster,
             List<String> indexedColumns,
             RexNode condition,
@@ -339,18 +310,18 @@ public class RexUtils {
             ImmutableBitSet requiredColumns
     ) {
         if (condition == null) {
-            return new IndexConditions();
+            return List.of();
         }
 
         condition = RexUtil.toCnf(builder(cluster), condition);
 
-        Int2ObjectOpenHashMap<List<RexCall>> fieldsToPredicates = mapPredicatesToFields(condition, cluster);
+        Int2ObjectMap<List<RexCall>> fieldsToPredicates = mapPredicatesToFields(condition, cluster);
 
         if (nullOrEmpty(fieldsToPredicates)) {
-            return new IndexConditions();
+            return List.of();
         }
 
-        List<RexNode> searchCondition = new ArrayList<>();
+        List<SearchBounds> bounds = Arrays.asList(new SearchBounds[rowType.getFieldCount()]);
 
         Mappings.TargetMapping toTrimmedRowMapping = null;
         if (requiredColumns != null) {
@@ -361,7 +332,7 @@ public class RexUtils {
             RelDataTypeField field = rowType.getField(columnName, true, false);
 
             if (field == null) {
-                return new IndexConditions();
+                return List.of();
             }
 
             int collFldIdx = toTrimmedRowMapping == null ? field.getIndex() : toTrimmedRowMapping.getTargetOpt(field.getIndex());
@@ -369,43 +340,21 @@ public class RexUtils {
             List<RexCall> collFldPreds = fieldsToPredicates.get(collFldIdx);
 
             if (nullOrEmpty(collFldPreds)) {
-                return new IndexConditions();
+                return List.of(); // Hash index can't be applied to partial condition.
             }
 
-            RexNode columnPred = null;
-
-            for (RexCall pred : collFldPreds) {
-                if (IgniteUtils.assertionsEnabled()) {
-                    RexNode cond = RexUtil.removeCast(pred.operands.get(1));
-
-                    assert idxOpSupports(cond) : cond;
-                }
-
-                SqlOperator op = pred.getOperator();
-
-                if (op.kind == EQUALS) {
-                    columnPred = pred;
-
-                    break;
-                }
-            }
+            RexCall columnPred = collFldPreds.stream()
+                    .filter(pred -> pred.getOperator().getKind() == EQUALS)
+                    .findAny().orElse(null);
 
             if (columnPred == null) {
-                return new IndexConditions();
+                return List.of();
             }
 
-            searchCondition.add(columnPred);
+            bounds.set(collFldIdx, createBounds(null, Collections.singletonList(columnPred), cluster, field.getType(), 1));
         }
 
-        Mappings.TargetMapping mapping = null;
-
-        if (requiredColumns != null) {
-            mapping = Commons.inverseMapping(requiredColumns, rowType.getFieldCount());
-        }
-
-        List<RexNode> searchRow = asBound(cluster, searchCondition, rowType, mapping);
-
-        return new IndexConditions(null, null, searchRow, searchRow);
+        return bounds;
     }
 
     /**
@@ -418,31 +367,22 @@ public class RexUtils {
     ) {
         condition = RexUtil.toCnf(builder(cluster), condition);
 
-        Int2ObjectOpenHashMap<List<RexCall>> fieldsToPredicates = mapPredicatesToFields(condition, cluster);
+        Int2ObjectMap<List<RexCall>> fieldsToPredicates = mapPredicatesToFields(condition, cluster);
 
         if (nullOrEmpty(fieldsToPredicates)) {
-            return null;
+            return List.of();
         }
 
         List<RexNode> searchPreds = null;
 
-        ObjectIterator<List<RexCall>> iterator = fieldsToPredicates.values().iterator();
-        while (iterator.hasNext()) {
-            List<RexCall> collFldPreds = iterator.next();
-
+        for (List<RexCall> collFldPreds : fieldsToPredicates.values()) {
             if (nullOrEmpty(collFldPreds)) {
                 break;
             }
 
             for (RexCall pred : collFldPreds) {
-                if (IgniteUtils.assertionsEnabled()) {
-                    RexNode cond = RexUtil.removeCast(pred.operands.get(1));
-
-                    assert idxOpSupports(cond) : cond;
-                }
-
                 if (pred.getOperator().kind != SqlKind.EQUALS) {
-                    return null;
+                    return List.of();
                 }
 
                 if (searchPreds == null) {
@@ -454,54 +394,215 @@ public class RexUtils {
         }
 
         if (searchPreds == null) {
-            return null;
+            return List.of();
         }
 
         return asBound(cluster, searchPreds, rowType, null);
     }
 
-    private static Int2ObjectOpenHashMap<List<RexCall>> mapPredicatesToFields(RexNode condition, RelOptCluster cluster) {
+    /** Create index search bound by conditions of the field. */
+    private static @Nullable SearchBounds createBounds(
+            @Nullable RelFieldCollation fc, // Can be null for EQUALS condition.
+            List<RexCall> collFldPreds,
+            RelOptCluster cluster,
+            RelDataType fldType,
+            int prevComplexity
+    ) {
+        RexBuilder builder = builder(cluster);
+
+        RexNode nullVal = builder.makeNullLiteral(fldType);
+
+        RexNode upperCond = null;
+        RexNode lowerCond = null;
+        RexNode upperBound = null;
+        RexNode lowerBound = null;
+        boolean upperInclude = true;
+        boolean lowerInclude = true;
+
+        for (RexCall pred : collFldPreds) {
+            RexNode val = null;
+
+            if (isBinaryComparison(pred)) {
+                val = removeCast(pred.operands.get(1));
+
+                assert idxOpSupports(val) : val;
+
+                val = makeCast(builder, val, fldType);
+            }
+
+            SqlOperator op = pred.getOperator();
+
+            if (op.kind == EQUALS) {
+                return new ExactBounds(pred, val);
+            } else if (op.kind == IS_NULL) {
+                return new ExactBounds(pred, nullVal);
+            } else if (op.kind == SEARCH) {
+                Sarg<?> sarg = ((RexLiteral) pred.operands.get(1)).getValueAs(Sarg.class);
+
+                int complexity = prevComplexity * sarg.complexity();
+
+                // Limit amount of search bounds tuples.
+                if (complexity > MAX_SEARCH_BOUNDS_COMPLEXITY) {
+                    return null;
+                }
+
+                RexNode sargCond = sargRef(builder, pred.operands.get(0), sarg, fldType, RexUnknownAs.UNKNOWN);
+                List<RexNode> disjunctions = RelOptUtil.disjunctions(RexUtil.toDnf(builder, sargCond));
+                List<SearchBounds> bounds = new ArrayList<>(disjunctions.size());
+
+                for (RexNode bound : disjunctions) {
+                    List<RexNode> conjunctions = RelOptUtil.conjunctions(bound);
+                    List<RexCall> calls = new ArrayList<>(conjunctions.size());
+
+                    for (RexNode rexNode : conjunctions) {
+                        if (isSupportedTreeComparison(rexNode)) {
+                            calls.add((RexCall) rexNode);
+                        } else {
+                            return null; // Cannot filter using this predicate (NOT_EQUALS for example).
+                        }
+                    }
+
+                    bounds.add(createBounds(fc, calls, cluster, fldType, complexity));
+                }
+
+                if (bounds.size() == 1) {
+                    return bounds.get(0);
+                }
+
+                return new MultiBounds(pred, bounds);
+            }
+
+            // Range bounds.
+            boolean lowerBoundBelow = !fc.getDirection().isDescending();
+            switch (op.kind) {
+                case LESS_THAN:
+                case LESS_THAN_OR_EQUAL:
+                    lowerBoundBelow = !lowerBoundBelow;
+                    // fallthrough.
+
+                case GREATER_THAN:
+                case GREATER_THAN_OR_EQUAL:
+                    if (lowerBoundBelow) {
+                        lowerCond = pred;
+                        lowerBound = val;
+
+                        if (op.kind == GREATER_THAN || op.kind == LESS_THAN) {
+                            lowerInclude = false;
+                        }
+                    } else {
+                        upperCond = pred;
+                        upperBound = val;
+
+                        if (op.kind == GREATER_THAN || op.kind == LESS_THAN) {
+                            upperInclude = false;
+                        }
+                    }
+                    // fallthrough.
+
+                case IS_NOT_NULL:
+                    if (fc.nullDirection == RelFieldCollation.NullDirection.FIRST && lowerBound == null) {
+                        lowerCond = pred;
+                        lowerBound = nullVal;
+                        lowerInclude = false;
+                    } else if (fc.nullDirection == RelFieldCollation.NullDirection.LAST && upperBound == null) {
+                        upperCond = pred;
+                        upperBound = nullVal;
+                        upperInclude = false;
+                    }
+                    break;
+
+                default:
+                    throw new AssertionError("Unknown condition: " + op.kind);
+            }
+        }
+
+        if (lowerBound == null && upperBound == null) {
+            return null; // No bounds.
+        }
+
+        // Found upper bound, lower bound or both.
+        RexNode cond = lowerCond == null ? upperCond :
+                upperCond == null ? lowerCond :
+                        upperCond == lowerCond ? lowerCond : builder.makeCall(SqlStdOperatorTable.AND, lowerCond, upperCond);
+
+        return new RangeBounds(cond, lowerBound, upperBound, lowerInclude, upperInclude);
+    }
+
+    private static Int2ObjectMap<List<RexCall>> mapPredicatesToFields(RexNode condition, RelOptCluster cluster) {
         List<RexNode> conjunctions = RelOptUtil.conjunctions(condition);
 
-        Int2ObjectOpenHashMap<List<RexCall>> res = new Int2ObjectOpenHashMap<>(conjunctions.size());
+        if (conjunctions.isEmpty()) {
+            return Int2ObjectMaps.emptyMap();
+        }
+
+        Int2ObjectMap<List<RexCall>> res = new Int2ObjectOpenHashMap<>(conjunctions.size());
 
         for (RexNode rexNode : conjunctions) {
-            if (!isBinaryComparison(rexNode)) {
+            if (!isSupportedTreeComparison(rexNode)) {
                 continue;
             }
 
             RexCall predCall = (RexCall) rexNode;
-            RexSlot ref = (RexSlot) extractRef(predCall);
+            RexSlot ref;
 
-            if (ref == null) {
-                continue;
-            }
+            if (isBinaryComparison(rexNode)) {
+                ref = (RexSlot) extractRefFromBinary(predCall, cluster);
+
+                if (ref == null) {
+                    continue;
+                }
 
-            // Let RexLocalRef be on the left side.
-            if (refOnTheRight(predCall)) {
-                predCall = (RexCall) RexUtil.invert(builder(cluster), predCall);
+                // Let RexLocalRef be on the left side.
+                if (refOnTheRight(predCall)) {
+                    predCall = (RexCall) RexUtil.invert(builder(cluster), predCall);
+                }
+            } else {
+                ref = (RexSlot) extractRefFromOperand(predCall, cluster, 0);
+
+                if (ref == null) {
+                    continue;
+                }
             }
 
             List<RexCall> fldPreds = res.computeIfAbsent(ref.getIndex(), k -> new ArrayList<>(conjunctions.size()));
 
             fldPreds.add(predCall);
         }
+
         return res;
     }
 
-    private static RexNode extractRef(RexCall call) {
+    private static RexNode extractRefFromBinary(RexCall call, RelOptCluster cluster) {
         assert isBinaryComparison(call);
 
-        RexNode leftOp = call.getOperands().get(0);
+        RexNode leftRef = extractRefFromOperand(call, cluster, 0);
         RexNode rightOp = call.getOperands().get(1);
 
-        leftOp = RexUtil.removeCast(leftOp);
-        rightOp = RexUtil.removeCast(rightOp);
+        if (leftRef != null) {
+            return idxOpSupports(removeCast(rightOp)) ? leftRef : null;
+        }
 
-        if ((leftOp instanceof RexLocalRef || leftOp instanceof RexInputRef) && idxOpSupports(rightOp)) {
-            return leftOp;
-        } else if ((rightOp instanceof RexLocalRef || rightOp instanceof RexInputRef) && idxOpSupports(leftOp)) {
-            return rightOp;
+        RexNode rightRef = extractRefFromOperand(call, cluster, 1);
+        RexNode leftOp = call.getOperands().get(0);
+
+        if (rightRef != null) {
+            return idxOpSupports(removeCast(leftOp)) ? rightRef : null;
+        }
+
+        return null;
+    }
+
+    private static RexNode extractRefFromOperand(RexCall call, RelOptCluster cluster, int operandNum) {
+        assert isSupportedTreeComparison(call);
+
+        RexNode op = call.getOperands().get(operandNum);
+
+        op = removeCast(op);
+
+        // Can proceed without ref cast only if cast was redundant in terms of values comparison.
+        if ((op instanceof RexSlot)
+                && !TypeUtils.needCast(cluster.getTypeFactory(), op.getType(), call.getOperands().get(operandNum).getType())) {
+            return op;
         }
 
         return null;
@@ -515,8 +616,15 @@ public class RexUtils {
         return rightOp.isA(SqlKind.LOCAL_REF) || rightOp.isA(SqlKind.INPUT_REF);
     }
 
-    public static boolean isBinaryComparison(RexNode exp) {
-        return TREE_INDEX_COMPARISON.contains(exp.getKind()) && (exp instanceof RexCall) && ((RexCall) exp).getOperands().size() == 2;
+    private static boolean isBinaryComparison(RexNode exp) {
+        return BINARY_COMPARISON.contains(exp.getKind())
+                && (exp instanceof RexCall)
+                && ((RexCall) exp).getOperands().size() == 2;
+    }
+
+    private static boolean isSupportedTreeComparison(RexNode exp) {
+        return TREE_INDEX_COMPARISON.contains(exp.getKind())
+                && (exp instanceof RexCall);
     }
 
     private static boolean idxOpSupports(RexNode op) {
@@ -541,6 +649,7 @@ public class RexUtils {
      * AsBound.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
+    @Deprecated(forRemoval = true)
     public static List<RexNode> asBound(RelOptCluster cluster, Iterable<RexNode> idxCond, RelDataType rowType,
             @Nullable Mappings.TargetMapping mapping) {
         if (nullOrEmpty(idxCond)) {
@@ -629,9 +738,9 @@ public class RexUtils {
     }
 
     /**
-    * Get hasCorrelation flag.
-    * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
-    */
+     * Get hasCorrelation flag.
+     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     */
     public static boolean hasCorrelation(List<RexNode> nodes) {
         try {
             RexVisitor<Void> v = new RexVisitorImpl<Void>(true) {
@@ -650,9 +759,9 @@ public class RexUtils {
     }
 
     /**
-    * ExtractCorrelationIds.
-    * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
-    */
+     * ExtractCorrelationIds.
+     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     */
     public static Set<CorrelationId> extractCorrelationIds(RexNode node) {
         if (node == null) {
             return Collections.emptySet();
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index e96d23b0bb..e743e60eb1 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -83,7 +83,7 @@ public class RuntimeSortedIndexTest extends IgniteAbstractTest {
                     Object[] lower = generateFindRow(rowIdLow, testIdx.getKey(), notUnique, testIdx.getValue());
                     Object[] upper = generateFindRow(rowIdUp, testIdx.getKey(), notUnique, testIdx.getValue());
 
-                    Cursor<Object[]> cur = idx0.find(lower, upper);
+                    Cursor<Object[]> cur = idx0.find(lower, upper, true, true);
 
                     int rows = 0;
                     while (cur.hasNext()) {
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
index 960d9393ce..7fae73256b 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
@@ -25,7 +25,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscription;
-import java.util.function.Supplier;
 import java.util.stream.IntStream;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory.Builder;
@@ -41,6 +40,8 @@ import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
 import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
@@ -92,22 +93,22 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest {
         // Validate bounds.
         validateSortedIndexScan(
                 tableData,
-                () -> new Object[]{2, 1},
-                () -> new Object[]{3, 0},
+                new Object[]{2, 1},
+                new Object[]{3, 0},
                 result
         );
 
         validateSortedIndexScan(
                 tableData,
-                () -> new Object[]{2, 1},
-                () -> new Object[]{4},
+                new Object[]{2, 1},
+                new Object[]{4},
                 result
 
         );
 
         validateSortedIndexScan(
                 tableData,
-                () -> new Object[]{null},
+                new Object[]{null},
                 null,
                 result
         );
@@ -116,8 +117,8 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest {
         IgniteTestUtils.assertThrowsWithCause(() ->
                 validateSortedIndexScan(
                         tableData,
-                        () -> new Object[]{2, "Brutus"},
-                        () -> new Object[]{3.9, 0},
+                        new Object[]{2, "Brutus"},
+                        new Object[]{3.9, 0},
                         // TODO: sort data, once IndexScanNode will support merging.
                         EMPTY
                 ), ClassCastException.class);
@@ -152,31 +153,31 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest {
         // Validate bounds.
         validateHashIndexScan(
                 tableData,
-                () -> new Object[]{4, 2},
+                new Object[]{4, 2},
                 tableData);
 
         validateHashIndexScan(
                 tableData,
-                () -> new Object[]{null, null},
+                new Object[]{null, null},
                 tableData);
 
         // Validate failure due to incorrect bounds.
         IgniteTestUtils.assertThrowsWithCause(() ->
                 validateHashIndexScan(
                         tableData,
-                        () -> new Object[]{2},
+                        new Object[]{2},
                         EMPTY
                 ), AssertionError.class, "Invalid lookup condition");
 
         IgniteTestUtils.assertThrowsWithCause(() ->
                 validateHashIndexScan(
                         tableData,
-                        () -> new Object[]{2, "Brutus"},
+                        new Object[]{2, "Brutus"},
                         EMPTY
                 ), ClassCastException.class);
     }
 
-    private void validateHashIndexScan(Object[][] tableData, @Nullable Supplier<Object[]> key, Object[][] expRes) {
+    private void validateHashIndexScan(Object[][] tableData, @Nullable Object[] key, Object[][] expRes) {
         SchemaDescriptor schemaDescriptor = new SchemaDescriptor(
                 1,
                 new Column[]{new Column("key", NativeTypes.INT64, false)},
@@ -214,8 +215,8 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest {
 
     private void validateSortedIndexScan(
             Object[][] tableData,
-            Supplier<Object[]> lowerBound,
-            Supplier<Object[]> upperBound,
+            Object[] lowerBound,
+            Object[] upperBound,
             Object[][] expectedData
     ) {
         SchemaDescriptor schemaDescriptor = new SchemaDescriptor(
@@ -264,22 +265,37 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest {
             Object[][] tableData,
             SchemaDescriptor schemaDescriptor,
             IgniteIndex index,
-            Supplier<Object[]> lowerBound,
-            Supplier<Object[]> upperBound,
+            Object[] lowerBound,
+            Object[] upperBound,
             Object[][] expectedData
     ) {
         ExecutionContext<Object[]> ectx = executionContext(true);
 
         RelDataType rowType = createRowTypeFromSchema(ectx.getTypeFactory(), schemaDescriptor);
 
+        RangeIterable<Object[]> rangeIterable = null;
+
+        if (lowerBound != null || upperBound != null) {
+            RangeCondition<Object[]> range = Mockito.mock(RangeCondition.class);
+
+            Mockito.doReturn(lowerBound).when(range).lower();
+            Mockito.doReturn(upperBound).when(range).upper();
+            Mockito.doReturn(true).when(range).lowerInclude();
+            Mockito.doReturn(true).when(range).upperInclude();
+
+            rangeIterable = Mockito.mock(RangeIterable.class);
+
+            Mockito.doReturn(1).when(rangeIterable).size();
+            Mockito.doAnswer(inv -> List.of(range).iterator()).when(rangeIterable).iterator();
+        }
+
         IndexScanNode<Object[]> scanNode = new IndexScanNode<>(
                 ectx,
                 rowType,
                 index,
                 new TestTable(rowType),
                 new int[]{0, 2},
-                lowerBound,
-                upperBound,
+                rangeIterable,
                 null,
                 null,
                 null
@@ -307,7 +323,6 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest {
         return rowTypeBuilder.build();
     }
 
-
     private BinaryTuple[] partitionData(Object[][] tableData, SchemaDescriptor schemaDescriptor, int partition) {
         BinaryTupleSchema binaryTupleSchema = BinaryTupleSchema.createRowSchema(schemaDescriptor);
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortedIndexSpoolExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortedIndexSpoolExecutionTest.java
index e03b62b9b7..a081582e09 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortedIndexSpoolExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortedIndexSpoolExecutionTest.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.function.Predicate;
@@ -30,6 +31,8 @@ import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
@@ -125,8 +128,7 @@ public class SortedIndexSpoolExecutionTest extends AbstractExecutionTest {
                     RelCollations.of(ImmutableIntList.of(0)),
                     (o1, o2) -> o1[0] != null ? ((Comparable) o1[0]).compareTo(o2[0]) : 0,
                     testFilter,
-                    () -> lower,
-                    () -> upper
+                    new StaticRangeIterable(lower, upper)
             );
 
             spool.register(singletonList(scan));
@@ -172,8 +174,7 @@ public class SortedIndexSpoolExecutionTest extends AbstractExecutionTest {
                 collation,
                 ctx.expressionFactory().comparator(collation),
                 v -> true,
-                () -> lower,
-                () -> upper
+                new StaticRangeIterable(lower, upper)
         );
 
         spool.register(scan);
@@ -240,4 +241,49 @@ public class SortedIndexSpoolExecutionTest extends AbstractExecutionTest {
             this.expectedResultSize = expectedResultSize;
         }
     }
+
+    private static class StaticRangeIterable implements RangeIterable<Object[]> {
+        private final Object[] lower;
+
+        private final Object[] upper;
+
+        private StaticRangeIterable(Object[] lower, Object[] upper) {
+            this.lower = lower;
+            this.upper = upper;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int size() {
+            return 1;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Iterator<RangeCondition<Object[]>> iterator() {
+            RangeCondition<Object[]> range = new RangeCondition<Object[]>() {
+                @Override
+                public Object[] lower() {
+                    return lower;
+                }
+
+                @Override
+                public Object[] upper() {
+                    return upper;
+                }
+
+                @Override
+                public boolean lowerInclude() {
+                    return true;
+                }
+
+                @Override
+                public boolean upperInclude() {
+                    return true;
+                }
+            };
+
+            return Collections.singleton(range).iterator();
+        }
+    }
 }
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
index 8dc73bce4c..53bce77d6d 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
@@ -26,7 +26,8 @@ import java.util.List;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexFieldAccess;
-import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.ExactBounds;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
@@ -98,28 +99,19 @@ public class CorrelatedNestedLoopJoinPlannerTest extends AbstractPlannerTest {
 
         IgniteIndexScan idxScan = findFirstNode(phys, byClass(IgniteIndexScan.class));
 
-        List<RexNode> lowerBound = idxScan.lowerBound();
+        List<SearchBounds> searchBounds = idxScan.searchBounds();
 
-        assertNotNull(lowerBound, "Invalid plan\n" + RelOptUtil.toString(phys));
-        assertEquals(3, lowerBound.size());
+        assertNotNull(searchBounds, "Invalid plan\n" + RelOptUtil.toString(phys));
+        assertEquals(3, searchBounds.size());
 
-        assertNull(lowerBound.get(0));
-        assertTrue(lowerBound.get(1) instanceof RexFieldAccess);
-        assertNull(lowerBound.get(2));
-
-        List<RexNode> upperBound = idxScan.upperBound();
-
-        assertNotNull(upperBound, "Invalid plan\n" + RelOptUtil.toString(phys));
-        assertEquals(3, upperBound.size());
-
-        assertNull(upperBound.get(0));
-        assertTrue(upperBound.get(1) instanceof RexFieldAccess);
-        assertNull(upperBound.get(2));
+        assertNull(searchBounds.get(0));
+        assertTrue(searchBounds.get(1) instanceof ExactBounds);
+        assertTrue(((ExactBounds) searchBounds.get(1)).bound() instanceof RexFieldAccess);
+        assertNull(searchBounds.get(2));
     }
 
     /**
-     * Check join with not equi condition. Current implementation of the CorrelatedNestedLoopJoinTest is not applicable
-     * for such case.
+     * Check join with not equi condition. Current implementation of the CorrelatedNestedLoopJoinTest is not applicable for such case.
      */
     @Test
     public void testInvalidIndexExpressions() throws Exception {
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ProjectFilterScanMergePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ProjectFilterScanMergePlannerTest.java
index 2f2b36a105..600d46a7f4 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ProjectFilterScanMergePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ProjectFilterScanMergePlannerTest.java
@@ -17,11 +17,16 @@
 
 package org.apache.ignite.internal.sql.engine.planner;
 
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.sql.engine.rel.IgniteAggregate;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
@@ -94,8 +99,7 @@ public class ProjectFilterScanMergePlannerTest extends AbstractPlannerTest {
                 .and(scan -> scan.condition() != null)
                 .and(scan -> "=($t2, 0)".equals(scan.condition().toString()))
                 .and(scan -> ImmutableBitSet.of(0, 1, 2).equals(scan.requiredColumns()))
-                .and(scan -> "[=($t2, 0)]".equals(scan.indexConditions().lowerCondition().toString()))
-                .and(scan -> "[=($t2, 0)]".equals(scan.indexConditions().upperCondition().toString()))
+                .and(scan -> "[=($t2, 0)]".equals(searchBoundsCondition(scan.searchBounds()).toString()))
         );
 
         // Index condition shifted according to requiredColumns.
@@ -105,8 +109,7 @@ public class ProjectFilterScanMergePlannerTest extends AbstractPlannerTest {
                 .and(scan -> scan.condition() != null)
                 .and(scan -> "=($t1, 0)".equals(scan.condition().toString()))
                 .and(scan -> ImmutableBitSet.of(1, 2).equals(scan.requiredColumns()))
-                .and(scan -> "[=($t1, 0)]".equals(scan.indexConditions().lowerCondition().toString()))
-                .and(scan -> "[=($t1, 0)]".equals(scan.indexConditions().upperCondition().toString()))
+                .and(scan -> "[=($t1, 0)]".equals(searchBoundsCondition(scan.searchBounds()).toString()))
         );
     }
 
@@ -122,8 +125,7 @@ public class ProjectFilterScanMergePlannerTest extends AbstractPlannerTest {
                 .and(scan -> scan.condition() != null)
                 .and(scan -> "=($t2, 0)".equals(scan.condition().toString()))
                 .and(scan -> ImmutableBitSet.of(0, 1, 2).equals(scan.requiredColumns()))
-                .and(scan -> "[=($t2, 0)]".equals(scan.indexConditions().lowerCondition().toString()))
-                .and(scan -> "[=($t2, 0)]".equals(scan.indexConditions().upperCondition().toString()))
+                .and(scan -> "[=($t2, 0)]".equals(searchBoundsCondition(scan.searchBounds()).toString()))
         );
 
         // Index condition shift and identity.
@@ -132,8 +134,7 @@ public class ProjectFilterScanMergePlannerTest extends AbstractPlannerTest {
                 .and(scan -> scan.condition() != null)
                 .and(scan -> "=($t1, 0)".equals(scan.condition().toString()))
                 .and(scan -> ImmutableBitSet.of(1, 2).equals(scan.requiredColumns()))
-                .and(scan -> "[=($t1, 0)]".equals(scan.indexConditions().lowerCondition().toString()))
-                .and(scan -> "[=($t1, 0)]".equals(scan.indexConditions().upperCondition().toString()))
+                .and(scan -> "[=($t1, 0)]".equals(searchBoundsCondition(scan.searchBounds()).toString()))
         );
     }
 
@@ -237,4 +238,11 @@ public class ProjectFilterScanMergePlannerTest extends AbstractPlannerTest {
                         .and(scan -> ImmutableBitSet.of(0, 2).equals(scan.requiredColumns())),
                 "ProjectFilterTransposeRule", "FilterProjectTransposeRule");
     }
+
+    /**
+     * Convert search bounds to RexNodes.
+     */
+    private static List<RexNode> searchBoundsCondition(List<SearchBounds> searchBounds) {
+        return searchBounds.stream().filter(Objects::nonNull).map(SearchBounds::condition).collect(Collectors.toList());
+    }
 }
\ No newline at end of file
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
index b41d83067b..cebf23eda7 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine.planner;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -27,8 +28,11 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexFieldAccess;
-import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexLiteral;
 import org.apache.ignite.internal.index.ColumnCollation;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.ExactBounds;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.RangeBounds;
+import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSortedIndexSpool;
@@ -46,8 +50,7 @@ import org.junit.jupiter.api.Test;
  */
 public class SortedIndexSpoolPlannerTest extends AbstractPlannerTest {
     /**
-     * Check equi-join on not colocated fields. CorrelatedNestedLoopJoinTest is applicable for this case only with
-     * IndexSpool.
+     * Check equi-join on not colocated fields. CorrelatedNestedLoopJoinTest is applicable for this case only with IndexSpool.
      */
     @Test
     public void testNotColocatedEqJoin() throws Exception {
@@ -100,28 +103,19 @@ public class SortedIndexSpoolPlannerTest extends AbstractPlannerTest {
 
         IgniteSortedIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteSortedIndexSpool.class));
 
-        List<RexNode> lowerBound = idxSpool.indexCondition().lowerBound();
+        List<SearchBounds> searchBounds = idxSpool.searchBounds();
 
-        assertNotNull(lowerBound);
-        assertEquals(3, lowerBound.size());
+        assertNotNull(searchBounds, "Invalid plan\n" + RelOptUtil.toString(phys));
+        assertEquals(3, searchBounds.size());
 
-        assertNull(lowerBound.get(0));
-        assertTrue(lowerBound.get(1) instanceof RexFieldAccess);
-        assertNull(lowerBound.get(2));
-
-        List<RexNode> upperBound = idxSpool.indexCondition().upperBound();
-
-        assertNotNull(upperBound);
-        assertEquals(3, upperBound.size());
-
-        assertNull(upperBound.get(0));
-        assertTrue(upperBound.get(1) instanceof RexFieldAccess);
-        assertNull(upperBound.get(2));
+        assertNull(searchBounds.get(0));
+        assertTrue(searchBounds.get(1) instanceof ExactBounds);
+        assertTrue(((ExactBounds) searchBounds.get(1)).bound() instanceof RexFieldAccess);
+        assertNull(searchBounds.get(2));
     }
 
     /**
-     * Check case when exists index (collation) isn't applied not for whole join condition but may be used by part of
-     * condition.
+     * Check case when exists index (collation) isn't applied not for whole join condition but may be used by part of condition.
      */
     @Test
     public void testPartialIndexForCondition() throws Exception {
@@ -177,25 +171,16 @@ public class SortedIndexSpoolPlannerTest extends AbstractPlannerTest {
 
         IgniteSortedIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteSortedIndexSpool.class));
 
-        List<RexNode> lowerBound = idxSpool.indexCondition().lowerBound();
-
-        assertNotNull(lowerBound);
-        assertEquals(4, lowerBound.size());
-
-        assertNull(lowerBound.get(0));
-        assertTrue(lowerBound.get(1) instanceof RexFieldAccess);
-        assertNull(lowerBound.get(2));
-        assertNull(lowerBound.get(3));
+        List<SearchBounds> searchBounds = idxSpool.searchBounds();
 
-        List<RexNode> upperBound = idxSpool.indexCondition().upperBound();
+        assertNotNull(searchBounds, "Invalid plan\n" + RelOptUtil.toString(phys));
+        assertEquals(4, searchBounds.size());
 
-        assertNotNull(upperBound);
-        assertEquals(4, upperBound.size());
-
-        assertNull(upperBound.get(0));
-        assertTrue(upperBound.get(1) instanceof RexFieldAccess);
-        assertNull(lowerBound.get(2));
-        assertNull(lowerBound.get(3));
+        assertNull(searchBounds.get(0));
+        assertTrue(searchBounds.get(1) instanceof ExactBounds);
+        assertTrue(((ExactBounds) searchBounds.get(1)).bound() instanceof RexFieldAccess);
+        assertNull(searchBounds.get(2));
+        assertNull(searchBounds.get(3));
     }
 
     /**
@@ -209,7 +194,7 @@ public class SortedIndexSpoolPlannerTest extends AbstractPlannerTest {
                         .addIndex("t0_jid_idx", 1),
                 createTable("T1", 100, IgniteDistributions.affinity(0, "T1", "hash"),
                         "ID", Integer.class, "JID", Integer.class, "VAL", String.class)
-                        .addIndex(RelCollations.of(TraitUtils.createFieldCollation(1, ColumnCollation.DESC_NULLS_FIRST)), "t1_jid_idx")
+                        .addIndex(RelCollations.of(TraitUtils.createFieldCollation(1, ColumnCollation.DESC_NULLS_LAST)), "t1_jid_idx")
         );
 
         String sql = "select * "
@@ -220,20 +205,22 @@ public class SortedIndexSpoolPlannerTest extends AbstractPlannerTest {
                 isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
                         .and(input(1, isInstanceOf(IgniteSortedIndexSpool.class)
                                 .and(spool -> {
-                                    List<RexNode> lowerBound = spool.indexCondition().lowerBound();
+                                    List<SearchBounds> searchBounds = spool.searchBounds();
 
                                     // Condition is LESS_THEN, but we have DESC field and condition should be in lower bound
                                     // instead of upper bound.
-                                    assertNotNull(lowerBound);
-                                    assertEquals(3, lowerBound.size());
-
-                                    assertNull(lowerBound.get(0));
-                                    assertTrue(lowerBound.get(1) instanceof RexFieldAccess);
-                                    assertNull(lowerBound.get(2));
-
-                                    List<RexNode> upperBound = spool.indexCondition().upperBound();
-
-                                    assertNull(upperBound);
+                                    assertNotNull(searchBounds);
+                                    assertEquals(3, searchBounds.size());
+
+                                    assertNull(searchBounds.get(0));
+                                    assertTrue(searchBounds.get(1) instanceof RangeBounds);
+                                    RangeBounds fld1Bounds = (RangeBounds) searchBounds.get(1);
+                                    assertTrue(fld1Bounds.lowerBound() instanceof RexFieldAccess);
+                                    assertFalse(fld1Bounds.lowerInclude());
+                                    // NULLS LAST in collation, so nulls can be skipped by upper bound.
+                                    assertTrue(((RexLiteral) fld1Bounds.upperBound()).isNull());
+                                    assertFalse(fld1Bounds.upperInclude());
+                                    assertNull(searchBounds.get(2));
 
                                     return true;
                                 })