You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/09/28 13:18:38 UTC

[ignite-3] 01/02: WIP. Add index scan node.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-17655
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit d0f949843d3314ffd4f18144357f6c5609b2857b
Author: amashenkov <an...@gmail.com>
AuthorDate: Tue Sep 27 17:51:50 2022 +0300

    WIP. Add index scan node.
---
 .../apache/ignite/internal/index/HashIndex.java    |   5 +-
 .../org/apache/ignite/internal/index/Index.java    |   5 +-
 .../apache/ignite/internal/index/SortedIndex.java  |  35 +-
 .../ignite/internal/index/SortedIndexImpl.java     |  15 +-
 .../sql/engine/exec/LogicalRelImplementor.java     |  35 +-
 .../sql/engine/exec/rel/IndexScanNode.java         | 363 +++++++++++++++++++++
 .../sql/engine/planner/AbstractPlannerTest.java    |  11 +-
 7 files changed, 431 insertions(+), 38 deletions(-)

diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java b/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
index 9d96acdb65..2aa21f520a 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
@@ -20,8 +20,9 @@ package org.apache.ignite.internal.index;
 import java.util.BitSet;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.tx.InternalTransaction;
 
 /**
  * An object that represents a hash index.
@@ -70,7 +71,7 @@ public class HashIndex implements Index<IndexDescriptor> {
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<BinaryTuple> scan(BinaryTuple key, BitSet columns) {
+    public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
         throw new UnsupportedOperationException("Index scan is not implemented yet");
     }
 }
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java b/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
index c454159e6e..e550e52767 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
@@ -19,8 +19,9 @@ package org.apache.ignite.internal.index;
 
 import java.util.BitSet;
 import java.util.UUID;
+import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.tx.InternalTransaction;
 
 /**
  * An object describing an abstract index.
@@ -39,5 +40,5 @@ public interface Index<DescriptorT extends IndexDescriptor> {
     DescriptorT descriptor();
 
     /** Returns cursor for the values corresponding to the given key. */
-    Cursor<BinaryTuple> scan(BinaryTuple key, BitSet columns);
+    Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns);
 }
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
index 95fa2488de..c713d53e83 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.internal.index;
 
 import java.util.BitSet;
+import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * An object describing a sorted index.
@@ -36,25 +38,42 @@ public interface SortedIndex extends Index<SortedIndexDescriptor> {
     /**
      * Opens a range cursor for given bounds with left bound included in result and right excluded.
      *
+     * @param partId Partition.
+     * @param tx Transaction.
      * @param left Left bound of range.
      * @param right Right bound of range.
      * @param columns Columns to include.
      * @return A cursor from resulting rows.
      */
-    default Cursor<BinaryTuple> scan(BinaryTuple left, BinaryTuple right, BitSet columns) {
-        return scan(left, right, INCLUDE_LEFT, columns);
+    default Publisher<BinaryTuple> scan(
+            int partId,
+            InternalTransaction tx,
+            @Nullable BinaryTuple left,
+            @Nullable BinaryTuple right,
+            BitSet columns
+    ) {
+        return scan(partId, tx, left, right, INCLUDE_LEFT, columns);
     }
 
     /**
      * Opens a range cursor for given bounds. Inclusion of the bounds is defined by {@code includeBounds} mask.
      *
-     * @param left Left bound of range.
-     * @param right Right bound of range.
-     * @param includeBounds A mask that defines whether to include bounds into the final result or not.
-     * @param columns Columns to include.
+     * @param partId Partition.
+     * @param tx Transaction.
+     * @param leftBound Left bound of range.
+     * @param rightBound Right bound of range.
+     * @param flags A mask that defines whether to include bounds into the final result or not.
+     * @param columnsToInclude Columns to include.
      * @return A cursor from resulting rows.
      * @see SortedIndex#INCLUDE_LEFT
      * @see SortedIndex#INCLUDE_RIGHT
      */
-    Cursor<BinaryTuple> scan(BinaryTuple left, BinaryTuple right, byte includeBounds, BitSet columns);
+    Publisher<BinaryTuple> scan(
+            int partId,
+            InternalTransaction tx,
+            @Nullable BinaryTuple leftBound,
+            @Nullable BinaryTuple rightBound,
+            int flags,
+            BitSet columnsToInclude
+    );
 }
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
index 07a0249d81..e68147e4ec 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
@@ -20,8 +20,10 @@ package org.apache.ignite.internal.index;
 import java.util.BitSet;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * An object that represents a sorted index.
@@ -70,13 +72,20 @@ public class SortedIndexImpl implements SortedIndex {
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<BinaryTuple> scan(BinaryTuple key, BitSet columns) {
+    public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
         throw new UnsupportedOperationException("Index scan is not implemented yet");
     }
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<BinaryTuple> scan(BinaryTuple left, BinaryTuple right, byte includeBounds, BitSet columns) {
+    public Publisher<BinaryTuple> scan(
+            int partId,
+            InternalTransaction tx,
+            @Nullable BinaryTuple leftBound,
+            @Nullable BinaryTuple rightBound,
+            int flags,
+            BitSet columnsToInclude
+    ) {
         throw new UnsupportedOperationException("Index scan is not implemented yet");
     }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 04179624ae..b7d260d9a8 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.sql.engine.exec.rel.CorrelatedNestedLoopJoinNo
 import org.apache.ignite.internal.sql.engine.exec.rel.FilterNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.HashAggregateNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
+import org.apache.ignite.internal.sql.engine.exec.rel.IndexScanNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.IndexSpoolNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.IntersectNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.LimitNode;
@@ -96,6 +97,7 @@ import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleHashAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleSortAggregate;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
 import org.apache.ignite.internal.sql.engine.trait.Destination;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
@@ -282,32 +284,27 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
     /** {@inheritDoc} */
     @Override
     public Node<RowT> visit(IgniteIndexScan rel) {
-        // TODO: fix this
-        //        RexNode condition = rel.condition();
-        //        List<RexNode> projects = rel.projects();
-
         InternalIgniteTable tbl = rel.getTable().unwrap(InternalIgniteTable.class);
-        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
 
+        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
         ImmutableBitSet requiredColumns = rel.requiredColumns();
-        //        List<RexNode> lowerCond = rel.lowerBound();
-        //        List<RexNode> upperCond = rel.upperBound();
-
         RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
 
-        //        Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
-        //        Supplier<Row> lower = lowerCond == null ? null : expressionFactory.rowSource(lowerCond);
-        //        Supplier<Row> upper = upperCond == null ? null : expressionFactory.rowSource(upperCond);
-        //        Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, rowType);
-        //
-        //        IgniteIndex idx = tbl.getIndex(rel.indexName());
-        //
-        //        ColocationGroup group = ctx.group(rel.sourceId());
+        List<RexNode> lowerCond = rel.lowerBound();
+        List<RexNode> upperCond = rel.upperBound();
+        RexNode condition = rel.condition();
+        List<RexNode> projects = rel.projects();
+
+        Supplier<RowT> lower = lowerCond == null ? null : expressionFactory.rowSource(lowerCond);
+        Supplier<RowT> upper = upperCond == null ? null : expressionFactory.rowSource(upperCond);
+        Predicate<RowT> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
+        Function<RowT, RowT> prj = projects == null ? null : expressionFactory.project(projects, rowType);
 
-        Iterable<RowT> rowsIter = (Iterable<RowT>) List.of(new Object[]{0, 0},
-                new Object[]{1, 1}); //idx.scan(ctx, group, filters, lower, upper, prj, requiredColumns);
+        IgniteIndex idx = tbl.getIndex(rel.indexName());
+        ColocationGroup group = ctx.group(rel.sourceId());
+        int[] parts = group.partitions(ctx.localNodeId());
 
-        return new ScanNode<>(ctx, rowType, rowsIter);
+        return new IndexScanNode<>(ctx, rowType, idx, tbl, parts, lower, upper, filters, prj, requiredColumns.toBitSet());
     }
 
     /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
new file mode 100644
index 0000000000..31c123b347
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import org.apache.calcite.rel.core.TableModify.Operation;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
+import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
+import org.apache.ignite.internal.index.SortedIndex;
+import org.apache.ignite.internal.schema.BinaryConverter;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.jetbrains.annotations.Contract;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Scan node.
+ * TODO: merge with {@link TableScanNode}
+ */
+public class IndexScanNode<RowT> extends AbstractNode<RowT> {
+    /** Special value to highlights that all row were received and we are not waiting any more. */
+    private static final int NOT_WAITING = -1;
+
+    /** Index that provides access to underlying data. */
+    private final SortedIndex physIndex;
+
+    /** Table that is an object in SQL schema. */
+    private final InternalIgniteTable schemaTable;
+
+    private final RowHandler.RowFactory<RowT> factory;
+
+    private final int[] parts;
+
+    private final Queue<RowT> inBuff = new LinkedBlockingQueue<>(inBufSize);
+
+    private final @Nullable Predicate<RowT> filters;
+
+    private final @Nullable Function<RowT, RowT> rowTransformer;
+
+    /** Participating columns. */
+    private final @Nullable BitSet requiredColumns;
+
+    private final @Nullable Supplier<RowT> lowerCond;
+
+    private final @Nullable Supplier<RowT> upperCond;
+
+    private final int flags;
+
+    private int requested;
+
+    private int waiting;
+
+    private boolean inLoop;
+
+    private Subscription activeSubscription;
+
+    private int curPartIdx;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx             Execution context.
+     * @param rowType         Output type of the current node.
+     * @param schemaTable     The table this node should scan.
+     * @param parts           Partition numbers to scan.
+     * @param filters         Optional filter to filter out rows.
+     * @param rowTransformer  Optional projection function.
+     * @param requiredColumns Optional set of column of interest.
+     */
+    public IndexScanNode(
+            ExecutionContext<RowT> ctx,
+            RelDataType rowType,
+            IgniteIndex index,
+            InternalIgniteTable schemaTable,
+            int[] parts,
+            @Nullable Supplier<RowT> lowerCond,
+            @Nullable Supplier<RowT> upperCond,
+            @Nullable Predicate<RowT> filters,
+            @Nullable Function<RowT, RowT> rowTransformer,
+            @Nullable BitSet requiredColumns
+    ) {
+        super(ctx, rowType);
+        assert !nullOrEmpty(parts);
+
+        this.physIndex = (SortedIndex) index.index();
+        this.schemaTable = schemaTable;
+        this.parts = parts;
+        this.lowerCond = lowerCond;
+        this.upperCond = upperCond;
+        this.filters = filters;
+        this.rowTransformer = rowTransformer;
+        this.requiredColumns = requiredColumns;
+
+        factory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
+
+        // TODO: create ticket to add flags support
+        flags = SortedIndex.INCLUDE_LEFT & SortedIndex.INCLUDE_RIGHT;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void request(int rowsCnt) throws Exception {
+        assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        if (!inLoop) {
+            context().execute(this::push, this::onError);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void closeInternal() {
+        super.closeInternal();
+
+        if (activeSubscription != null) {
+            activeSubscription.cancel();
+
+            activeSubscription = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void rewindInternal() {
+        requested = 0;
+        waiting = 0;
+        curPartIdx = 0;
+
+        if (activeSubscription != null) {
+            activeSubscription.cancel();
+
+            activeSubscription = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void register(List<Node<RowT>> sources) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected Downstream<RowT> requestDownstream(int idx) {
+        throw new UnsupportedOperationException();
+    }
+
+    private void push() throws Exception {
+        if (isClosed()) {
+            return;
+        }
+
+        checkState();
+
+        if (requested > 0 && !inBuff.isEmpty()) {
+            inLoop = true;
+            try {
+                while (requested > 0 && !inBuff.isEmpty()) {
+                    checkState();
+
+                    RowT row = inBuff.poll();
+
+                    if (filters != null && !filters.test(row)) {
+                        continue;
+                    }
+
+                    if (rowTransformer != null) {
+                        row = rowTransformer.apply(row);
+                    }
+
+                    requested--;
+                    downstream().push(row);
+                }
+            } finally {
+                inLoop = false;
+            }
+        }
+
+        if (waiting == 0 || activeSubscription == null) {
+            requestNextBatch();
+        }
+
+        if (requested > 0 && waiting == NOT_WAITING) {
+            if (inBuff.isEmpty()) {
+                requested = 0;
+                downstream().end();
+            } else {
+                context().execute(this::push, this::onError);
+            }
+        }
+    }
+
+    private void requestNextBatch() {
+        if (waiting == NOT_WAITING) {
+            return;
+        }
+
+        if (waiting == 0) {
+            // we must not request rows more than inBufSize
+            waiting = inBufSize - inBuff.size();
+        }
+
+        Subscription subscription = this.activeSubscription;
+        if (subscription != null) {
+            subscription.request(waiting);
+        } else if (curPartIdx < parts.length) {
+            //TODO: Merge sorted iterators.
+            physIndex.scan(
+                    parts[curPartIdx++],
+                    context().transaction(),
+                    extractIndexColumns(lowerCond),
+                    extractIndexColumns(upperCond),
+                    flags,
+                    requiredColumns
+            ).subscribe(new SubscriberImpl());
+        } else {
+            waiting = NOT_WAITING;
+        }
+    }
+
+    private class SubscriberImpl implements Flow.Subscriber<BinaryTuple> {
+
+        private int received = 0; // HB guarded here.
+        /** {@inheritDoc} */
+        @Override
+        public void onSubscribe(Subscription subscription) {
+            assert IndexScanNode.this.activeSubscription == null;
+
+            IndexScanNode.this.activeSubscription = subscription;
+            subscription.request(waiting);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onNext(BinaryTuple binRow) {
+            RowT row = convert(binRow);
+
+            inBuff.add(row);
+
+            if (++received == inBufSize) {
+                received = 0;
+
+                context().execute(() -> {
+                    waiting = 0;
+                    push();
+                }, IndexScanNode.this::onError);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onError(Throwable throwable) {
+            context().execute(() -> {
+                throw throwable;
+            }, IndexScanNode.this::onError);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onComplete() {
+            context().execute(() -> {
+                activeSubscription = null;
+                waiting = 0;
+
+                push();
+            }, IndexScanNode.this::onError);
+        }
+
+    }
+
+    //TODO: Decouple from table and move code to converter.
+    //TODO: Change table row layout -> index row layout.
+    @Contract("null -> null")
+    private @Nullable BinaryTuple extractIndexColumns(@Nullable Supplier<RowT> lowerCond) {
+        if (lowerCond == null)
+            return null;
+
+        TableDescriptor desc = schemaTable.descriptor();
+
+        BinaryTupleSchema binaryTupleSchema = BinaryTupleSchema.create( IntStream.range(0, desc.columnsCount())
+                .mapToObj(i -> desc.columnDescriptor(i))
+                .map(colDesc -> new Element(colDesc.physicalType(), colDesc.nullable()))
+                .toArray(Element[]::new));
+
+
+        BinaryRowEx binaryRowEx = schemaTable.toModifyRow(context(), lowerCond.get(), Operation.INSERT, null).getRow();
+
+        ByteBuffer tuple = BinaryConverter.forRow(((Row) binaryRowEx).schema()).toTuple(binaryRowEx);
+
+        return new BinaryTuple(binaryTupleSchema, tuple);
+    }
+
+    //TODO: Decouple from table and move code to converter.
+    private RowT convert(BinaryTuple binTuple) {
+        ExecutionContext<RowT> ectx = context();
+        TableDescriptor desc = schemaTable.descriptor();
+
+        RowHandler<RowT> handler = factory.handler();
+        RowT res = factory.create();
+
+        if (requiredColumns == null) {
+            for (int i = 0; i < desc.columnsCount(); i++) {
+                ColumnDescriptor colDesc = desc.columnDescriptor(i);
+
+                handler.set(i, res, TypeUtils.toInternal(ectx, binTuple.value(colDesc.physicalIndex())));
+            }
+        } else {
+            for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j = requiredColumns.nextSetBit(j + 1), i++) {
+                ColumnDescriptor colDesc = desc.columnDescriptor(j);
+
+                handler.set(i, res, TypeUtils.toInternal(ectx, binTuple.value(colDesc.physicalIndex())));
+            }
+        }
+
+        return res;
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index cd4f3a3625..894d00c45b 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Flow.Publisher;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -121,6 +122,7 @@ import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
@@ -1200,19 +1202,20 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
 
         /** {@inheritDoc} */
         @Override
-        public Cursor<BinaryTuple> scan(BinaryTuple key, BitSet columns) {
+        public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
             throw new AssertionError("Should not be called");
         }
 
         /** {@inheritDoc} */
         @Override
-        public Cursor<BinaryTuple> scan(BinaryTuple left, BinaryTuple right, BitSet columns) {
+        public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple left, BinaryTuple right, BitSet columns) {
             throw new AssertionError("Should not be called");
         }
 
         /** {@inheritDoc} */
         @Override
-        public Cursor<BinaryTuple> scan(BinaryTuple left, BinaryTuple right, byte includeBounds, BitSet columns) {
+        public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx,
+                @Nullable BinaryTuple leftBound, @Nullable BinaryTuple rightBound, int flags, BitSet columnsToInclude) {
             throw new AssertionError("Should not be called");
         }
     }
@@ -1260,7 +1263,7 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
 
         /** {@inheritDoc} */
         @Override
-        public Cursor<BinaryTuple> scan(BinaryTuple key, BitSet columns) {
+        public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
             throw new AssertionError("Should not be called");
         }
     }