You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/09/28 13:18:38 UTC
[ignite-3] 01/02: WIP. Add index scan node.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-17655
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit d0f949843d3314ffd4f18144357f6c5609b2857b
Author: amashenkov <an...@gmail.com>
AuthorDate: Tue Sep 27 17:51:50 2022 +0300
WIP. Add index scan node.
---
.../apache/ignite/internal/index/HashIndex.java | 5 +-
.../org/apache/ignite/internal/index/Index.java | 5 +-
.../apache/ignite/internal/index/SortedIndex.java | 35 +-
.../ignite/internal/index/SortedIndexImpl.java | 15 +-
.../sql/engine/exec/LogicalRelImplementor.java | 35 +-
.../sql/engine/exec/rel/IndexScanNode.java | 363 +++++++++++++++++++++
.../sql/engine/planner/AbstractPlannerTest.java | 11 +-
7 files changed, 431 insertions(+), 38 deletions(-)
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java b/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
index 9d96acdb65..2aa21f520a 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
@@ -20,8 +20,9 @@ package org.apache.ignite.internal.index;
import java.util.BitSet;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.tx.InternalTransaction;
/**
* An object that represents a hash index.
@@ -70,7 +71,7 @@ public class HashIndex implements Index<IndexDescriptor> {
/** {@inheritDoc} */
@Override
- public Cursor<BinaryTuple> scan(BinaryTuple key, BitSet columns) {
+ public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
throw new UnsupportedOperationException("Index scan is not implemented yet");
}
}
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java b/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
index c454159e6e..e550e52767 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
@@ -19,8 +19,9 @@ package org.apache.ignite.internal.index;
import java.util.BitSet;
import java.util.UUID;
+import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.tx.InternalTransaction;
/**
* An object describing an abstract index.
@@ -39,5 +40,5 @@ public interface Index<DescriptorT extends IndexDescriptor> {
DescriptorT descriptor();
/** Returns cursor for the values corresponding to the given key. */
- Cursor<BinaryTuple> scan(BinaryTuple key, BitSet columns);
+ Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns);
}
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
index 95fa2488de..c713d53e83 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
@@ -18,8 +18,10 @@
package org.apache.ignite.internal.index;
import java.util.BitSet;
+import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.jetbrains.annotations.Nullable;
/**
* An object describing a sorted index.
@@ -36,25 +38,42 @@ public interface SortedIndex extends Index<SortedIndexDescriptor> {
/**
* Opens a range cursor for given bounds with left bound included in result and right excluded.
*
+ * @param partId Partition.
+ * @param tx Transaction.
* @param left Left bound of range.
* @param right Right bound of range.
* @param columns Columns to include.
* @return A cursor from resulting rows.
*/
- default Cursor<BinaryTuple> scan(BinaryTuple left, BinaryTuple right, BitSet columns) {
- return scan(left, right, INCLUDE_LEFT, columns);
+ default Publisher<BinaryTuple> scan(
+ int partId,
+ InternalTransaction tx,
+ @Nullable BinaryTuple left,
+ @Nullable BinaryTuple right,
+ BitSet columns
+ ) {
+ return scan(partId, tx, left, right, INCLUDE_LEFT, columns);
}
/**
* Opens a range cursor for given bounds. Inclusion of the bounds is defined by {@code includeBounds} mask.
*
- * @param left Left bound of range.
- * @param right Right bound of range.
- * @param includeBounds A mask that defines whether to include bounds into the final result or not.
- * @param columns Columns to include.
+ * @param partId Partition.
+ * @param tx Transaction.
+ * @param leftBound Left bound of range.
+ * @param rightBound Right bound of range.
+ * @param flags A mask that defines whether to include bounds into the final result or not.
+ * @param columnsToInclude Columns to include.
* @return A cursor from resulting rows.
* @see SortedIndex#INCLUDE_LEFT
* @see SortedIndex#INCLUDE_RIGHT
*/
- Cursor<BinaryTuple> scan(BinaryTuple left, BinaryTuple right, byte includeBounds, BitSet columns);
+ Publisher<BinaryTuple> scan(
+ int partId,
+ InternalTransaction tx,
+ @Nullable BinaryTuple leftBound,
+ @Nullable BinaryTuple rightBound,
+ int flags,
+ BitSet columnsToInclude
+ );
}
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
index 07a0249d81..e68147e4ec 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
@@ -20,8 +20,10 @@ package org.apache.ignite.internal.index;
import java.util.BitSet;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.jetbrains.annotations.Nullable;
/**
* An object that represents a sorted index.
@@ -70,13 +72,20 @@ public class SortedIndexImpl implements SortedIndex {
/** {@inheritDoc} */
@Override
- public Cursor<BinaryTuple> scan(BinaryTuple key, BitSet columns) {
+ public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
throw new UnsupportedOperationException("Index scan is not implemented yet");
}
/** {@inheritDoc} */
@Override
- public Cursor<BinaryTuple> scan(BinaryTuple left, BinaryTuple right, byte includeBounds, BitSet columns) {
+ public Publisher<BinaryTuple> scan(
+ int partId,
+ InternalTransaction tx,
+ @Nullable BinaryTuple leftBound,
+ @Nullable BinaryTuple rightBound,
+ int flags,
+ BitSet columnsToInclude
+ ) {
throw new UnsupportedOperationException("Index scan is not implemented yet");
}
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 04179624ae..b7d260d9a8 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.sql.engine.exec.rel.CorrelatedNestedLoopJoinNo
import org.apache.ignite.internal.sql.engine.exec.rel.FilterNode;
import org.apache.ignite.internal.sql.engine.exec.rel.HashAggregateNode;
import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
+import org.apache.ignite.internal.sql.engine.exec.rel.IndexScanNode;
import org.apache.ignite.internal.sql.engine.exec.rel.IndexSpoolNode;
import org.apache.ignite.internal.sql.engine.exec.rel.IntersectNode;
import org.apache.ignite.internal.sql.engine.exec.rel.LimitNode;
@@ -96,6 +97,7 @@ import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleHashAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleSortAggregate;
import org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
import org.apache.ignite.internal.sql.engine.trait.Destination;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
@@ -282,32 +284,27 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
/** {@inheritDoc} */
@Override
public Node<RowT> visit(IgniteIndexScan rel) {
- // TODO: fix this
- // RexNode condition = rel.condition();
- // List<RexNode> projects = rel.projects();
-
InternalIgniteTable tbl = rel.getTable().unwrap(InternalIgniteTable.class);
- IgniteTypeFactory typeFactory = ctx.getTypeFactory();
+ IgniteTypeFactory typeFactory = ctx.getTypeFactory();
ImmutableBitSet requiredColumns = rel.requiredColumns();
- // List<RexNode> lowerCond = rel.lowerBound();
- // List<RexNode> upperCond = rel.upperBound();
-
RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
- // Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
- // Supplier<Row> lower = lowerCond == null ? null : expressionFactory.rowSource(lowerCond);
- // Supplier<Row> upper = upperCond == null ? null : expressionFactory.rowSource(upperCond);
- // Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, rowType);
- //
- // IgniteIndex idx = tbl.getIndex(rel.indexName());
- //
- // ColocationGroup group = ctx.group(rel.sourceId());
+ List<RexNode> lowerCond = rel.lowerBound();
+ List<RexNode> upperCond = rel.upperBound();
+ RexNode condition = rel.condition();
+ List<RexNode> projects = rel.projects();
+
+ Supplier<RowT> lower = lowerCond == null ? null : expressionFactory.rowSource(lowerCond);
+ Supplier<RowT> upper = upperCond == null ? null : expressionFactory.rowSource(upperCond);
+ Predicate<RowT> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
+ Function<RowT, RowT> prj = projects == null ? null : expressionFactory.project(projects, rowType);
- Iterable<RowT> rowsIter = (Iterable<RowT>) List.of(new Object[]{0, 0},
- new Object[]{1, 1}); //idx.scan(ctx, group, filters, lower, upper, prj, requiredColumns);
+ IgniteIndex idx = tbl.getIndex(rel.indexName());
+ ColocationGroup group = ctx.group(rel.sourceId());
+ int[] parts = group.partitions(ctx.localNodeId());
- return new ScanNode<>(ctx, rowType, rowsIter);
+ return new IndexScanNode<>(ctx, rowType, idx, tbl, parts, lower, upper, filters, prj, requiredColumns.toBitSet());
}
/** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
new file mode 100644
index 0000000000..31c123b347
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import org.apache.calcite.rel.core.TableModify.Operation;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
+import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
+import org.apache.ignite.internal.index.SortedIndex;
+import org.apache.ignite.internal.schema.BinaryConverter;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.jetbrains.annotations.Contract;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Scan node.
+ * TODO: merge with {@link TableScanNode}
+ */
+public class IndexScanNode<RowT> extends AbstractNode<RowT> {
+ /** Special value to highlights that all row were received and we are not waiting any more. */
+ private static final int NOT_WAITING = -1;
+
+ /** Index that provides access to underlying data. */
+ private final SortedIndex physIndex;
+
+ /** Table that is an object in SQL schema. */
+ private final InternalIgniteTable schemaTable;
+
+ private final RowHandler.RowFactory<RowT> factory;
+
+ private final int[] parts;
+
+ private final Queue<RowT> inBuff = new LinkedBlockingQueue<>(inBufSize);
+
+ private final @Nullable Predicate<RowT> filters;
+
+ private final @Nullable Function<RowT, RowT> rowTransformer;
+
+ /** Participating columns. */
+ private final @Nullable BitSet requiredColumns;
+
+ private final @Nullable Supplier<RowT> lowerCond;
+
+ private final @Nullable Supplier<RowT> upperCond;
+
+ private final int flags;
+
+ private int requested;
+
+ private int waiting;
+
+ private boolean inLoop;
+
+ private Subscription activeSubscription;
+
+ private int curPartIdx;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Execution context.
+ * @param rowType Output type of the current node.
+ * @param schemaTable The table this node should scan.
+ * @param parts Partition numbers to scan.
+ * @param filters Optional filter to filter out rows.
+ * @param rowTransformer Optional projection function.
+ * @param requiredColumns Optional set of column of interest.
+ */
+ public IndexScanNode(
+ ExecutionContext<RowT> ctx,
+ RelDataType rowType,
+ IgniteIndex index,
+ InternalIgniteTable schemaTable,
+ int[] parts,
+ @Nullable Supplier<RowT> lowerCond,
+ @Nullable Supplier<RowT> upperCond,
+ @Nullable Predicate<RowT> filters,
+ @Nullable Function<RowT, RowT> rowTransformer,
+ @Nullable BitSet requiredColumns
+ ) {
+ super(ctx, rowType);
+ assert !nullOrEmpty(parts);
+
+ this.physIndex = (SortedIndex) index.index();
+ this.schemaTable = schemaTable;
+ this.parts = parts;
+ this.lowerCond = lowerCond;
+ this.upperCond = upperCond;
+ this.filters = filters;
+ this.rowTransformer = rowTransformer;
+ this.requiredColumns = requiredColumns;
+
+ factory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
+
+ // TODO: create ticket to add flags support
+ flags = SortedIndex.INCLUDE_LEFT & SortedIndex.INCLUDE_RIGHT;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void request(int rowsCnt) throws Exception {
+ assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested;
+
+ checkState();
+
+ requested = rowsCnt;
+
+ if (!inLoop) {
+ context().execute(this::push, this::onError);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void closeInternal() {
+ super.closeInternal();
+
+ if (activeSubscription != null) {
+ activeSubscription.cancel();
+
+ activeSubscription = null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void rewindInternal() {
+ requested = 0;
+ waiting = 0;
+ curPartIdx = 0;
+
+ if (activeSubscription != null) {
+ activeSubscription.cancel();
+
+ activeSubscription = null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void register(List<Node<RowT>> sources) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected Downstream<RowT> requestDownstream(int idx) {
+ throw new UnsupportedOperationException();
+ }
+
+ private void push() throws Exception {
+ if (isClosed()) {
+ return;
+ }
+
+ checkState();
+
+ if (requested > 0 && !inBuff.isEmpty()) {
+ inLoop = true;
+ try {
+ while (requested > 0 && !inBuff.isEmpty()) {
+ checkState();
+
+ RowT row = inBuff.poll();
+
+ if (filters != null && !filters.test(row)) {
+ continue;
+ }
+
+ if (rowTransformer != null) {
+ row = rowTransformer.apply(row);
+ }
+
+ requested--;
+ downstream().push(row);
+ }
+ } finally {
+ inLoop = false;
+ }
+ }
+
+ if (waiting == 0 || activeSubscription == null) {
+ requestNextBatch();
+ }
+
+ if (requested > 0 && waiting == NOT_WAITING) {
+ if (inBuff.isEmpty()) {
+ requested = 0;
+ downstream().end();
+ } else {
+ context().execute(this::push, this::onError);
+ }
+ }
+ }
+
+ private void requestNextBatch() {
+ if (waiting == NOT_WAITING) {
+ return;
+ }
+
+ if (waiting == 0) {
+ // we must not request rows more than inBufSize
+ waiting = inBufSize - inBuff.size();
+ }
+
+ Subscription subscription = this.activeSubscription;
+ if (subscription != null) {
+ subscription.request(waiting);
+ } else if (curPartIdx < parts.length) {
+ //TODO: Merge sorted iterators.
+ physIndex.scan(
+ parts[curPartIdx++],
+ context().transaction(),
+ extractIndexColumns(lowerCond),
+ extractIndexColumns(upperCond),
+ flags,
+ requiredColumns
+ ).subscribe(new SubscriberImpl());
+ } else {
+ waiting = NOT_WAITING;
+ }
+ }
+
+ private class SubscriberImpl implements Flow.Subscriber<BinaryTuple> {
+
+ private int received = 0; // HB guarded here.
+ /** {@inheritDoc} */
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ assert IndexScanNode.this.activeSubscription == null;
+
+ IndexScanNode.this.activeSubscription = subscription;
+ subscription.request(waiting);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onNext(BinaryTuple binRow) {
+ RowT row = convert(binRow);
+
+ inBuff.add(row);
+
+ if (++received == inBufSize) {
+ received = 0;
+
+ context().execute(() -> {
+ waiting = 0;
+ push();
+ }, IndexScanNode.this::onError);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onError(Throwable throwable) {
+ context().execute(() -> {
+ throw throwable;
+ }, IndexScanNode.this::onError);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onComplete() {
+ context().execute(() -> {
+ activeSubscription = null;
+ waiting = 0;
+
+ push();
+ }, IndexScanNode.this::onError);
+ }
+
+ }
+
+ //TODO: Decouple from table and move code to converter.
+ //TODO: Change table row layout -> index row layout.
+ @Contract("null -> null")
+ private @Nullable BinaryTuple extractIndexColumns(@Nullable Supplier<RowT> lowerCond) {
+ if (lowerCond == null)
+ return null;
+
+ TableDescriptor desc = schemaTable.descriptor();
+
+ BinaryTupleSchema binaryTupleSchema = BinaryTupleSchema.create( IntStream.range(0, desc.columnsCount())
+ .mapToObj(i -> desc.columnDescriptor(i))
+ .map(colDesc -> new Element(colDesc.physicalType(), colDesc.nullable()))
+ .toArray(Element[]::new));
+
+
+ BinaryRowEx binaryRowEx = schemaTable.toModifyRow(context(), lowerCond.get(), Operation.INSERT, null).getRow();
+
+ ByteBuffer tuple = BinaryConverter.forRow(((Row) binaryRowEx).schema()).toTuple(binaryRowEx);
+
+ return new BinaryTuple(binaryTupleSchema, tuple);
+ }
+
+ //TODO: Decouple from table and move code to converter.
+ private RowT convert(BinaryTuple binTuple) {
+ ExecutionContext<RowT> ectx = context();
+ TableDescriptor desc = schemaTable.descriptor();
+
+ RowHandler<RowT> handler = factory.handler();
+ RowT res = factory.create();
+
+ if (requiredColumns == null) {
+ for (int i = 0; i < desc.columnsCount(); i++) {
+ ColumnDescriptor colDesc = desc.columnDescriptor(i);
+
+ handler.set(i, res, TypeUtils.toInternal(ectx, binTuple.value(colDesc.physicalIndex())));
+ }
+ } else {
+ for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j = requiredColumns.nextSetBit(j + 1), i++) {
+ ColumnDescriptor colDesc = desc.columnDescriptor(j);
+
+ handler.set(i, res, TypeUtils.toInternal(ectx, binTuple.value(colDesc.physicalIndex())));
+ }
+ }
+
+ return res;
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index cd4f3a3625..894d00c45b 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -40,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Flow.Publisher;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -121,6 +122,7 @@ import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
@@ -1200,19 +1202,20 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
/** {@inheritDoc} */
@Override
- public Cursor<BinaryTuple> scan(BinaryTuple key, BitSet columns) {
+ public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
throw new AssertionError("Should not be called");
}
/** {@inheritDoc} */
@Override
- public Cursor<BinaryTuple> scan(BinaryTuple left, BinaryTuple right, BitSet columns) {
+ public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple left, BinaryTuple right, BitSet columns) {
throw new AssertionError("Should not be called");
}
/** {@inheritDoc} */
@Override
- public Cursor<BinaryTuple> scan(BinaryTuple left, BinaryTuple right, byte includeBounds, BitSet columns) {
+ public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx,
+ @Nullable BinaryTuple leftBound, @Nullable BinaryTuple rightBound, int flags, BitSet columnsToInclude) {
throw new AssertionError("Should not be called");
}
}
@@ -1260,7 +1263,7 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
/** {@inheritDoc} */
@Override
- public Cursor<BinaryTuple> scan(BinaryTuple key, BitSet columns) {
+ public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
throw new AssertionError("Should not be called");
}
}