You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/09/28 13:18:37 UTC

[ignite-3] branch ignite-17655 created (now 9602cc3b7c)

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a change to branch ignite-17655
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


      at 9602cc3b7c WIP. Add test.

This branch includes the following new commits:

     new d0f949843d WIP. Add index scan node.
     new 9602cc3b7c WIP. Add test.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite-3] 01/02: WIP. Add index scan node.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-17655
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit d0f949843d3314ffd4f18144357f6c5609b2857b
Author: amashenkov <an...@gmail.com>
AuthorDate: Tue Sep 27 17:51:50 2022 +0300

    WIP. Add index scan node.
---
 .../apache/ignite/internal/index/HashIndex.java    |   5 +-
 .../org/apache/ignite/internal/index/Index.java    |   5 +-
 .../apache/ignite/internal/index/SortedIndex.java  |  35 +-
 .../ignite/internal/index/SortedIndexImpl.java     |  15 +-
 .../sql/engine/exec/LogicalRelImplementor.java     |  35 +-
 .../sql/engine/exec/rel/IndexScanNode.java         | 363 +++++++++++++++++++++
 .../sql/engine/planner/AbstractPlannerTest.java    |  11 +-
 7 files changed, 431 insertions(+), 38 deletions(-)

diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java b/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
index 9d96acdb65..2aa21f520a 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
@@ -20,8 +20,9 @@ package org.apache.ignite.internal.index;
 import java.util.BitSet;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.tx.InternalTransaction;
 
 /**
  * An object that represents a hash index.
@@ -70,7 +71,7 @@ public class HashIndex implements Index<IndexDescriptor> {
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<BinaryTuple> scan(BinaryTuple key, BitSet columns) {
+    public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
         throw new UnsupportedOperationException("Index scan is not implemented yet");
     }
 }
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java b/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
index c454159e6e..e550e52767 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
@@ -19,8 +19,9 @@ package org.apache.ignite.internal.index;
 
 import java.util.BitSet;
 import java.util.UUID;
+import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.tx.InternalTransaction;
 
 /**
  * An object describing an abstract index.
@@ -39,5 +40,5 @@ public interface Index<DescriptorT extends IndexDescriptor> {
     DescriptorT descriptor();
 
     /** Returns cursor for the values corresponding to the given key. */
-    Cursor<BinaryTuple> scan(BinaryTuple key, BitSet columns);
+    Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns);
 }
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
index 95fa2488de..c713d53e83 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.internal.index;
 
 import java.util.BitSet;
+import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * An object describing a sorted index.
@@ -36,25 +38,42 @@ public interface SortedIndex extends Index<SortedIndexDescriptor> {
     /**
      * Opens a range cursor for given bounds with left bound included in result and right excluded.
      *
+     * @param partId Partition.
+     * @param tx Transaction.
      * @param left Left bound of range.
      * @param right Right bound of range.
      * @param columns Columns to include.
      * @return A cursor from resulting rows.
      */
-    default Cursor<BinaryTuple> scan(BinaryTuple left, BinaryTuple right, BitSet columns) {
-        return scan(left, right, INCLUDE_LEFT, columns);
+    default Publisher<BinaryTuple> scan(
+            int partId,
+            InternalTransaction tx,
+            @Nullable BinaryTuple left,
+            @Nullable BinaryTuple right,
+            BitSet columns
+    ) {
+        return scan(partId, tx, left, right, INCLUDE_LEFT, columns);
     }
 
     /**
      * Opens a range cursor for given bounds. Inclusion of the bounds is defined by {@code includeBounds} mask.
      *
-     * @param left Left bound of range.
-     * @param right Right bound of range.
-     * @param includeBounds A mask that defines whether to include bounds into the final result or not.
-     * @param columns Columns to include.
+     * @param partId Partition.
+     * @param tx Transaction.
+     * @param leftBound Left bound of range.
+     * @param rightBound Right bound of range.
+     * @param flags A mask that defines whether to include bounds into the final result or not.
+     * @param columnsToInclude Columns to include.
      * @return A cursor from resulting rows.
      * @see SortedIndex#INCLUDE_LEFT
      * @see SortedIndex#INCLUDE_RIGHT
      */
-    Cursor<BinaryTuple> scan(BinaryTuple left, BinaryTuple right, byte includeBounds, BitSet columns);
+    Publisher<BinaryTuple> scan(
+            int partId,
+            InternalTransaction tx,
+            @Nullable BinaryTuple leftBound,
+            @Nullable BinaryTuple rightBound,
+            int flags,
+            BitSet columnsToInclude
+    );
 }
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
index 07a0249d81..e68147e4ec 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
@@ -20,8 +20,10 @@ package org.apache.ignite.internal.index;
 import java.util.BitSet;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * An object that represents a sorted index.
@@ -70,13 +72,20 @@ public class SortedIndexImpl implements SortedIndex {
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<BinaryTuple> scan(BinaryTuple key, BitSet columns) {
+    public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
         throw new UnsupportedOperationException("Index scan is not implemented yet");
     }
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<BinaryTuple> scan(BinaryTuple left, BinaryTuple right, byte includeBounds, BitSet columns) {
+    public Publisher<BinaryTuple> scan(
+            int partId,
+            InternalTransaction tx,
+            @Nullable BinaryTuple leftBound,
+            @Nullable BinaryTuple rightBound,
+            int flags,
+            BitSet columnsToInclude
+    ) {
         throw new UnsupportedOperationException("Index scan is not implemented yet");
     }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 04179624ae..b7d260d9a8 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.sql.engine.exec.rel.CorrelatedNestedLoopJoinNo
 import org.apache.ignite.internal.sql.engine.exec.rel.FilterNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.HashAggregateNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
+import org.apache.ignite.internal.sql.engine.exec.rel.IndexScanNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.IndexSpoolNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.IntersectNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.LimitNode;
@@ -96,6 +97,7 @@ import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleHashAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleSortAggregate;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
 import org.apache.ignite.internal.sql.engine.trait.Destination;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
@@ -282,32 +284,27 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
     /** {@inheritDoc} */
     @Override
     public Node<RowT> visit(IgniteIndexScan rel) {
-        // TODO: fix this
-        //        RexNode condition = rel.condition();
-        //        List<RexNode> projects = rel.projects();
-
         InternalIgniteTable tbl = rel.getTable().unwrap(InternalIgniteTable.class);
-        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
 
+        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
         ImmutableBitSet requiredColumns = rel.requiredColumns();
-        //        List<RexNode> lowerCond = rel.lowerBound();
-        //        List<RexNode> upperCond = rel.upperBound();
-
         RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
 
-        //        Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
-        //        Supplier<Row> lower = lowerCond == null ? null : expressionFactory.rowSource(lowerCond);
-        //        Supplier<Row> upper = upperCond == null ? null : expressionFactory.rowSource(upperCond);
-        //        Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, rowType);
-        //
-        //        IgniteIndex idx = tbl.getIndex(rel.indexName());
-        //
-        //        ColocationGroup group = ctx.group(rel.sourceId());
+        List<RexNode> lowerCond = rel.lowerBound();
+        List<RexNode> upperCond = rel.upperBound();
+        RexNode condition = rel.condition();
+        List<RexNode> projects = rel.projects();
+
+        Supplier<RowT> lower = lowerCond == null ? null : expressionFactory.rowSource(lowerCond);
+        Supplier<RowT> upper = upperCond == null ? null : expressionFactory.rowSource(upperCond);
+        Predicate<RowT> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
+        Function<RowT, RowT> prj = projects == null ? null : expressionFactory.project(projects, rowType);
 
-        Iterable<RowT> rowsIter = (Iterable<RowT>) List.of(new Object[]{0, 0},
-                new Object[]{1, 1}); //idx.scan(ctx, group, filters, lower, upper, prj, requiredColumns);
+        IgniteIndex idx = tbl.getIndex(rel.indexName());
+        ColocationGroup group = ctx.group(rel.sourceId());
+        int[] parts = group.partitions(ctx.localNodeId());
 
-        return new ScanNode<>(ctx, rowType, rowsIter);
+        return new IndexScanNode<>(ctx, rowType, idx, tbl, parts, lower, upper, filters, prj, requiredColumns.toBitSet());
     }
 
     /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
new file mode 100644
index 0000000000..31c123b347
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import org.apache.calcite.rel.core.TableModify.Operation;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
+import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
+import org.apache.ignite.internal.index.SortedIndex;
+import org.apache.ignite.internal.schema.BinaryConverter;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.jetbrains.annotations.Contract;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Scan node.
+ * TODO: merge with {@link TableScanNode}
+ */
+public class IndexScanNode<RowT> extends AbstractNode<RowT> {
+    /** Special value to highlights that all row were received and we are not waiting any more. */
+    private static final int NOT_WAITING = -1;
+
+    /** Index that provides access to underlying data. */
+    private final SortedIndex physIndex;
+
+    /** Table that is an object in SQL schema. */
+    private final InternalIgniteTable schemaTable;
+
+    private final RowHandler.RowFactory<RowT> factory;
+
+    private final int[] parts;
+
+    private final Queue<RowT> inBuff = new LinkedBlockingQueue<>(inBufSize);
+
+    private final @Nullable Predicate<RowT> filters;
+
+    private final @Nullable Function<RowT, RowT> rowTransformer;
+
+    /** Participating columns. */
+    private final @Nullable BitSet requiredColumns;
+
+    private final @Nullable Supplier<RowT> lowerCond;
+
+    private final @Nullable Supplier<RowT> upperCond;
+
+    private final int flags;
+
+    private int requested;
+
+    private int waiting;
+
+    private boolean inLoop;
+
+    private Subscription activeSubscription;
+
+    private int curPartIdx;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx             Execution context.
+     * @param rowType         Output type of the current node.
+     * @param schemaTable     The table this node should scan.
+     * @param parts           Partition numbers to scan.
+     * @param filters         Optional filter to filter out rows.
+     * @param rowTransformer  Optional projection function.
+     * @param requiredColumns Optional set of column of interest.
+     */
+    public IndexScanNode(
+            ExecutionContext<RowT> ctx,
+            RelDataType rowType,
+            IgniteIndex index,
+            InternalIgniteTable schemaTable,
+            int[] parts,
+            @Nullable Supplier<RowT> lowerCond,
+            @Nullable Supplier<RowT> upperCond,
+            @Nullable Predicate<RowT> filters,
+            @Nullable Function<RowT, RowT> rowTransformer,
+            @Nullable BitSet requiredColumns
+    ) {
+        super(ctx, rowType);
+        assert !nullOrEmpty(parts);
+
+        this.physIndex = (SortedIndex) index.index();
+        this.schemaTable = schemaTable;
+        this.parts = parts;
+        this.lowerCond = lowerCond;
+        this.upperCond = upperCond;
+        this.filters = filters;
+        this.rowTransformer = rowTransformer;
+        this.requiredColumns = requiredColumns;
+
+        factory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);
+
+        // TODO: create ticket to add flags support
+        flags = SortedIndex.INCLUDE_LEFT & SortedIndex.INCLUDE_RIGHT;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void request(int rowsCnt) throws Exception {
+        assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        if (!inLoop) {
+            context().execute(this::push, this::onError);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void closeInternal() {
+        super.closeInternal();
+
+        if (activeSubscription != null) {
+            activeSubscription.cancel();
+
+            activeSubscription = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void rewindInternal() {
+        requested = 0;
+        waiting = 0;
+        curPartIdx = 0;
+
+        if (activeSubscription != null) {
+            activeSubscription.cancel();
+
+            activeSubscription = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void register(List<Node<RowT>> sources) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected Downstream<RowT> requestDownstream(int idx) {
+        throw new UnsupportedOperationException();
+    }
+
+    private void push() throws Exception {
+        if (isClosed()) {
+            return;
+        }
+
+        checkState();
+
+        if (requested > 0 && !inBuff.isEmpty()) {
+            inLoop = true;
+            try {
+                while (requested > 0 && !inBuff.isEmpty()) {
+                    checkState();
+
+                    RowT row = inBuff.poll();
+
+                    if (filters != null && !filters.test(row)) {
+                        continue;
+                    }
+
+                    if (rowTransformer != null) {
+                        row = rowTransformer.apply(row);
+                    }
+
+                    requested--;
+                    downstream().push(row);
+                }
+            } finally {
+                inLoop = false;
+            }
+        }
+
+        if (waiting == 0 || activeSubscription == null) {
+            requestNextBatch();
+        }
+
+        if (requested > 0 && waiting == NOT_WAITING) {
+            if (inBuff.isEmpty()) {
+                requested = 0;
+                downstream().end();
+            } else {
+                context().execute(this::push, this::onError);
+            }
+        }
+    }
+
+    private void requestNextBatch() {
+        if (waiting == NOT_WAITING) {
+            return;
+        }
+
+        if (waiting == 0) {
+            // we must not request rows more than inBufSize
+            waiting = inBufSize - inBuff.size();
+        }
+
+        Subscription subscription = this.activeSubscription;
+        if (subscription != null) {
+            subscription.request(waiting);
+        } else if (curPartIdx < parts.length) {
+            //TODO: Merge sorted iterators.
+            physIndex.scan(
+                    parts[curPartIdx++],
+                    context().transaction(),
+                    extractIndexColumns(lowerCond),
+                    extractIndexColumns(upperCond),
+                    flags,
+                    requiredColumns
+            ).subscribe(new SubscriberImpl());
+        } else {
+            waiting = NOT_WAITING;
+        }
+    }
+
+    private class SubscriberImpl implements Flow.Subscriber<BinaryTuple> {
+
+        private int received = 0; // HB guarded here.
+        /** {@inheritDoc} */
+        @Override
+        public void onSubscribe(Subscription subscription) {
+            assert IndexScanNode.this.activeSubscription == null;
+
+            IndexScanNode.this.activeSubscription = subscription;
+            subscription.request(waiting);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onNext(BinaryTuple binRow) {
+            RowT row = convert(binRow);
+
+            inBuff.add(row);
+
+            if (++received == inBufSize) {
+                received = 0;
+
+                context().execute(() -> {
+                    waiting = 0;
+                    push();
+                }, IndexScanNode.this::onError);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onError(Throwable throwable) {
+            context().execute(() -> {
+                throw throwable;
+            }, IndexScanNode.this::onError);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onComplete() {
+            context().execute(() -> {
+                activeSubscription = null;
+                waiting = 0;
+
+                push();
+            }, IndexScanNode.this::onError);
+        }
+
+    }
+
+    //TODO: Decouple from table and move code to converter.
+    //TODO: Change table row layout -> index row layout.
+    @Contract("null -> null")
+    private @Nullable BinaryTuple extractIndexColumns(@Nullable Supplier<RowT> lowerCond) {
+        if (lowerCond == null)
+            return null;
+
+        TableDescriptor desc = schemaTable.descriptor();
+
+        BinaryTupleSchema binaryTupleSchema = BinaryTupleSchema.create( IntStream.range(0, desc.columnsCount())
+                .mapToObj(i -> desc.columnDescriptor(i))
+                .map(colDesc -> new Element(colDesc.physicalType(), colDesc.nullable()))
+                .toArray(Element[]::new));
+
+
+        BinaryRowEx binaryRowEx = schemaTable.toModifyRow(context(), lowerCond.get(), Operation.INSERT, null).getRow();
+
+        ByteBuffer tuple = BinaryConverter.forRow(((Row) binaryRowEx).schema()).toTuple(binaryRowEx);
+
+        return new BinaryTuple(binaryTupleSchema, tuple);
+    }
+
+    //TODO: Decouple from table and move code to converter.
+    private RowT convert(BinaryTuple binTuple) {
+        ExecutionContext<RowT> ectx = context();
+        TableDescriptor desc = schemaTable.descriptor();
+
+        RowHandler<RowT> handler = factory.handler();
+        RowT res = factory.create();
+
+        if (requiredColumns == null) {
+            for (int i = 0; i < desc.columnsCount(); i++) {
+                ColumnDescriptor colDesc = desc.columnDescriptor(i);
+
+                handler.set(i, res, TypeUtils.toInternal(ectx, binTuple.value(colDesc.physicalIndex())));
+            }
+        } else {
+            for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j = requiredColumns.nextSetBit(j + 1), i++) {
+                ColumnDescriptor colDesc = desc.columnDescriptor(j);
+
+                handler.set(i, res, TypeUtils.toInternal(ectx, binTuple.value(colDesc.physicalIndex())));
+            }
+        }
+
+        return res;
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index cd4f3a3625..894d00c45b 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Flow.Publisher;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -121,6 +122,7 @@ import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
@@ -1200,19 +1202,20 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
 
         /** {@inheritDoc} */
         @Override
-        public Cursor<BinaryTuple> scan(BinaryTuple key, BitSet columns) {
+        public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
             throw new AssertionError("Should not be called");
         }
 
         /** {@inheritDoc} */
         @Override
-        public Cursor<BinaryTuple> scan(BinaryTuple left, BinaryTuple right, BitSet columns) {
+        public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple left, BinaryTuple right, BitSet columns) {
             throw new AssertionError("Should not be called");
         }
 
         /** {@inheritDoc} */
         @Override
-        public Cursor<BinaryTuple> scan(BinaryTuple left, BinaryTuple right, byte includeBounds, BitSet columns) {
+        public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx,
+                @Nullable BinaryTuple leftBound, @Nullable BinaryTuple rightBound, int flags, BitSet columnsToInclude) {
             throw new AssertionError("Should not be called");
         }
     }
@@ -1260,7 +1263,7 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
 
         /** {@inheritDoc} */
         @Override
-        public Cursor<BinaryTuple> scan(BinaryTuple key, BitSet columns) {
+        public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
             throw new AssertionError("Should not be called");
         }
     }


[ignite-3] 02/02: WIP. Add test.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-17655
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 9602cc3b7c8c038ea672ed5b32ab02ca6c767f1b
Author: amashenkov <an...@gmail.com>
AuthorDate: Wed Sep 28 16:18:27 2022 +0300

    WIP. Add test.
---
 .../sql/engine/exec/rel/IndexScanNode.java         |  25 ++-
 .../sql/engine/exec/rel/AbstractExecutionTest.java |   2 +
 .../exec/rel/IndexScanNodeExecutionTest.java       | 176 +++++++++++++++++++++
 .../engine/exec/rel/MergeJoinExecutionTest.java    |   4 +-
 .../exec/rel/NestedLoopJoinExecutionTest.java      |   2 -
 5 files changed, 189 insertions(+), 20 deletions(-)

diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
index 31c123b347..8de50a2e23 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
 import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Queue;
@@ -33,13 +32,8 @@ import java.util.function.Supplier;
 import java.util.stream.IntStream;
 import org.apache.calcite.rel.core.TableModify.Operation;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
-import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
-import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
 import org.apache.ignite.internal.index.SortedIndex;
 import org.apache.ignite.internal.schema.BinaryConverter;
-import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
@@ -101,12 +95,12 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
     /**
      * Constructor.
      *
-     * @param ctx             Execution context.
-     * @param rowType         Output type of the current node.
-     * @param schemaTable     The table this node should scan.
-     * @param parts           Partition numbers to scan.
-     * @param filters         Optional filter to filter out rows.
-     * @param rowTransformer  Optional projection function.
+     * @param ctx Execution context.
+     * @param rowType Output type of the current node.
+     * @param schemaTable The table this node should scan.
+     * @param parts Partition numbers to scan.
+     * @param filters Optional filter to filter out rows.
+     * @param rowTransformer Optional projection function.
      * @param requiredColumns Optional set of column of interest.
      */
     public IndexScanNode(
@@ -267,6 +261,7 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
     private class SubscriberImpl implements Flow.Subscriber<BinaryTuple> {
 
         private int received = 0; // HB guarded here.
+
         /** {@inheritDoc} */
         @Override
         public void onSubscribe(Subscription subscription) {
@@ -318,17 +313,17 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {
     //TODO: Change table row layout -> index row layout.
     @Contract("null -> null")
     private @Nullable BinaryTuple extractIndexColumns(@Nullable Supplier<RowT> lowerCond) {
-        if (lowerCond == null)
+        if (lowerCond == null) {
             return null;
+        }
 
         TableDescriptor desc = schemaTable.descriptor();
 
-        BinaryTupleSchema binaryTupleSchema = BinaryTupleSchema.create( IntStream.range(0, desc.columnsCount())
+        BinaryTupleSchema binaryTupleSchema = BinaryTupleSchema.create(IntStream.range(0, desc.columnsCount())
                 .mapToObj(i -> desc.columnDescriptor(i))
                 .map(colDesc -> new Element(colDesc.physicalType(), colDesc.nullable()))
                 .toArray(Element[]::new));
 
-
         BinaryRowEx binaryRowEx = schemaTable.toModifyRow(context(), lowerCond.get(), Operation.INSERT, null).getRow();
 
         ByteBuffer tuple = BinaryConverter.forRow(((Row) binaryRowEx).schema()).toTuple(binaryRowEx);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index a25b4c3c38..5f17133c0b 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -55,6 +55,8 @@ import org.junit.jupiter.api.BeforeEach;
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
 public class AbstractExecutionTest extends IgniteAbstractTest {
+    public static final Object[][] EMPTY = new Object[0][];
+
     private Throwable lastE;
 
     private QueryTaskExecutorImpl taskExecutor;
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
new file mode 100644
index 0000000000..8d16504976
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscription;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.index.SortedIndex;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest;
+import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTable;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+//TODO: check merge multiple tries.
+public class IndexScanNodeExecutionTest extends AbstractExecutionTest {
+    @Test
+    public void sortedIndex() {
+        // Empty index.
+        verifyIndexScan(
+                EMPTY,
+                ImmutableBitSet.of(0, 1),
+                EMPTY
+        );
+
+        verifyIndexScan(
+                new Object[][]{
+                        {1, "Roman", null},
+                        {2, "Igor", 4L},
+                        {3, "Taras", 3L},
+                        {4, "Alexey", 1L},
+                        {5, "Ivan", 4L},
+                        {6, "Andrey", 2L}
+                },
+                ImmutableBitSet.of(0, 2),
+                // TODO: sort data
+                new Object[][]{
+                        {1, "Roman", null},
+                        {2, "Igor", null},
+                        {3, "Taras", null},
+                        {4, "Alexey", null},
+                        {5, "Ivan", null},
+                        {6, "Andrey", null}
+                }
+        );
+    }
+
+    private void verifyIndexScan(Object[][] tableData, ImmutableBitSet requiredColumns, Object[][] expRes) {
+        BinaryTuple[] tableRows = createRows(tableData);
+
+        BinaryTuple[] part0Rows = Arrays.stream(tableRows).limit(tableRows.length / 2).toArray(BinaryTuple[]::new);
+        BinaryTuple[] part2Rows = Arrays.stream(tableRows).skip(tableRows.length / 2).toArray(BinaryTuple[]::new);
+
+        ExecutionContext<Object[]> ectx = executionContext(true);
+
+        RelDataType rowType = TypeUtils.createRowType(ectx.getTypeFactory(), long.class, String.class, int.class);
+
+        IgniteIndex indexMock = Mockito.mock(IgniteIndex.class);
+        SortedIndex sortedIndexMock = Mockito.mock(SortedIndex.class);
+
+        Mockito.doReturn(sortedIndexMock).when(indexMock).index();
+
+        Mockito.doReturn(dummyPublisher(part0Rows)).when(sortedIndexMock)
+                .scan(Mockito.eq(0), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.any());
+        Mockito.doReturn(dummyPublisher(part2Rows)).when(sortedIndexMock)
+                .scan(Mockito.eq(2), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.any());
+
+        AbstractPlannerTest.TestTable table = new AbstractPlannerTest.TestTable(rowType) {
+            @Override
+            public IgniteDistribution distribution() {
+                return IgniteDistributions.any();
+            }
+        };
+
+        IndexScanNode<Object[]> scanNode = new IndexScanNode<>(
+                ectx,
+                rowType,
+                indexMock,
+                table,
+                new int[]{0, 2},
+                null,
+                null,
+                null,
+                null,
+                requiredColumns.toBitSet()
+        );
+
+        RootNode<Object[]> node = new RootNode<>(ectx, rowType);
+        node.register(scanNode);
+
+        ArrayList<Object[]> res = new ArrayList<>();
+
+        while (node.hasNext()) {
+            res.add(node.next());
+        }
+
+        assertThat(res.toArray(EMPTY), equalTo(expRes));
+    }
+
+    private BinaryTuple[] createRows(Object[][] tableData) {
+        BinaryTupleSchema binaryTupleSchema = BinaryTupleSchema.createRowSchema(new SchemaDescriptor(
+                1,
+                new Column[]{new Column("key", NativeTypes.INT32, false)},
+                new Column[]{
+                        new Column("idxVal", NativeTypes.INT64, true),
+                        new Column("val", NativeTypes.stringOf(Integer.MAX_VALUE), true)
+                }
+        ));
+
+        return Arrays.stream(tableData)
+                .map(row -> new BinaryTuple(binaryTupleSchema, new BinaryTupleBuilder(3, true)
+                        .appendInt((int) row[0])
+                        .appendLong((Long) row[2])
+                        .appendString((String) row[1])
+                        .build())
+                )
+                .toArray(BinaryTuple[]::new);
+    }
+
+    @NotNull
+    private static Publisher<BinaryTuple> dummyPublisher(BinaryTuple[] rows) {
+        return s -> {
+            s.onSubscribe(new Subscription() {
+                @Override
+                public void request(long n) {
+                    // No-op.
+                }
+
+                @Override
+                public void cancel() {
+                    // No-op.
+                }
+            });
+
+            for (int i = 0; i < rows.length; ++i) {
+                s.onNext(rows[i]);
+            }
+
+            s.onComplete();
+        };
+    }
+
+    //TODO: Add hash index.
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
index 078111b024..019e3fec03 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
@@ -12,7 +12,7 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
- * limitations under the License.
+* limitations under the License.
  */
 
 package org.apache.ignite.internal.sql.engine.exec.rel;
@@ -42,8 +42,6 @@ import org.junit.jupiter.api.Test;
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
 public class MergeJoinExecutionTest extends AbstractExecutionTest {
-    public static final Object[][] EMPTY = new Object[0][];
-
     @Test
     public void joinEmptyTables() {
         verifyJoin(EMPTY, EMPTY, INNER, EMPTY);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
index 6f8c763959..d457b95306 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
@@ -43,8 +43,6 @@ import org.junit.jupiter.api.Test;
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
 public class NestedLoopJoinExecutionTest extends AbstractExecutionTest {
-    public static final Object[][] EMPTY = new Object[0][];
-
     @Test
     public void joinEmptyTables() {
         verifyJoin(EMPTY, EMPTY, INNER, EMPTY);