You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/10/13 14:37:57 UTC

[ignite-3] branch ignite-17748 updated (d8edfa014e -> 43774b939d)

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a change to branch ignite-17748
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


 discard d8edfa014e WIP.
     new 43774b939d WIP.

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (d8edfa014e)
            \
             N -- N -- N   refs/heads/ignite-17748 (43774b939d)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 modules/index/build.gradle | 1 +
 1 file changed, 1 insertion(+)


[ignite-3] 01/01: WIP.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-17748
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 43774b939d512b659907afac4b4d805d5fab11b6
Author: amashenkov <an...@gmail.com>
AuthorDate: Thu Oct 13 17:32:58 2022 +0300

    WIP.
---
 .../ignite/client/fakes/FakeInternalTable.java     | 16 ++++
 .../ignite/internal/util/PublisherAdapter.java     | 85 ++++++++++++++++++++++
 modules/index/build.gradle                         |  1 +
 modules/index/pom.xml                              |  5 ++
 .../apache/ignite/internal/index/HashIndex.java    | 38 ++++++++--
 .../apache/ignite/internal/index/IndexManager.java | 22 ++++--
 .../ignite/internal/index/SortedIndexImpl.java     | 40 ++++++++--
 .../ignite/internal/index/IndexManagerTest.java    |  6 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  2 +-
 .../sql/engine/exec/MockedStructuresTest.java      |  2 +-
 .../ignite/internal/table/InternalTable.java       | 34 +++++++++
 .../distributed/storage/InternalTableImpl.java     | 70 ++++++++++++++----
 12 files changed, 285 insertions(+), 36 deletions(-)

diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index da457b78bf..5426139ac3 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client.fakes;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
@@ -29,6 +30,7 @@ import java.util.function.BiConsumer;
 import javax.naming.OperationNotSupportedException;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.tx.InternalTransaction;
@@ -302,6 +304,20 @@ public class FakeInternalTable implements InternalTable {
         throw new IgniteInternalException(new OperationNotSupportedException());
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public Publisher<BinaryRow> scan(int partId, @Nullable InternalTransaction tx, UUID indexId, BinaryTuple key,
+            BitSet columnsToInclude) {
+        throw new IgniteInternalException(new OperationNotSupportedException());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Publisher<BinaryRow> scan(int partId, @Nullable InternalTransaction tx, UUID indexId, @Nullable BinaryTuple lowerBound,
+            @Nullable BinaryTuple upperBound, int flags, BitSet columnsToInclude) {
+        throw new IgniteInternalException(new OperationNotSupportedException());
+    }
+
     /** {@inheritDoc} */
     @Override
     public List<String> assignments() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/PublisherAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/PublisherAdapter.java
new file mode 100644
index 0000000000..ee560c206f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/PublisherAdapter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.Objects;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.function.Function;
+
+/**
+ * Publisher adapter class.
+ *
+ * @param <SourceT> Source type.
+ * @param <TargetT> Target type.
+ */
+public class PublisherAdapter<SourceT, TargetT> implements Publisher<TargetT> {
+    private final Publisher<SourceT> delegate;
+    private final Function<SourceT, TargetT> converter;
+
+    public PublisherAdapter(Publisher<SourceT> delegate, Function<SourceT, TargetT> converter) {
+        this.delegate = Objects.requireNonNull(delegate, "Publisher");
+        this.converter = Objects.requireNonNull(converter, "Converter");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void subscribe(Subscriber<? super TargetT> subscriber) {
+        delegate.subscribe(new SubscriberAdapter<SourceT, TargetT>(subscriber, converter));
+    }
+
+    /**
+     * Subscriber adapter class.
+     * @param <SourceT> Source type.
+     * @param <TargetT> Target type.
+     */
+    private static class SubscriberAdapter<SourceT, TargetT> implements Subscriber<SourceT> {
+        private final Subscriber<? super TargetT> delegate;
+        private final Function<SourceT, TargetT> converter;
+
+        SubscriberAdapter(Subscriber<? super TargetT> delegate, Function<SourceT, TargetT> converter) {
+            this.delegate = delegate;
+            this.converter = converter;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onSubscribe(Subscription subscription) {
+            delegate.onSubscribe(subscription);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onNext(SourceT item) {
+            delegate.onNext(converter.apply(item));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onError(Throwable th) {
+            delegate.onError(th);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onComplete() {
+            delegate.onComplete();
+        }
+    }
+}
diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index 50276b5977..25ca9b99ec 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -23,6 +23,7 @@ dependencies {
     implementation project(':ignite-core')
     implementation project(':ignite-configuration')
     implementation project(':ignite-schema')
+    implementation project(':ignite-table')
     implementation project(':ignite-transactions')
     implementation project(':ignite-configuration')
     implementation project(':ignite-extended-api')
diff --git a/modules/index/pom.xml b/modules/index/pom.xml
index cf8a6ad27b..93f4bc07ff 100644
--- a/modules/index/pom.xml
+++ b/modules/index/pom.xml
@@ -53,6 +53,11 @@
             <artifactId>ignite-schema</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-table</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-transactions</artifactId>
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java b/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
index 2aa21f520a..5f7452f25f 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
@@ -17,32 +17,44 @@
 
 package org.apache.ignite.internal.index;
 
+import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.Flow.Publisher;
+import org.apache.ignite.internal.schema.BinaryConverter;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.PublisherAdapter;
 
 /**
  * An object that represents a hash index.
  */
 public class HashIndex implements Index<IndexDescriptor> {
     private final UUID id;
-    private final UUID tableId;
+    private final InternalTable table;
     private final IndexDescriptor descriptor;
+    private final SchemaRegistry schemaRegistry;
 
     /**
      * Constructs the index.
      *
      * @param id An identifier of the index.
-     * @param tableId An identifier of the table this index relates to.
+     * @param table A table this index relates to.
      * @param descriptor A descriptor of the index.
      */
-    public HashIndex(UUID id, UUID tableId, IndexDescriptor descriptor) {
+    public HashIndex(UUID id, TableImpl table, IndexDescriptor descriptor) {
         this.id = Objects.requireNonNull(id, "id");
-        this.tableId = Objects.requireNonNull(tableId, "tableId");
+        this.table = Objects.requireNonNull(table.internalTable(), "table");
         this.descriptor = Objects.requireNonNull(descriptor, "descriptor");
+
+        schemaRegistry = table.schemaView();
     }
 
     /** {@inheritDoc} */
@@ -54,7 +66,7 @@ public class HashIndex implements Index<IndexDescriptor> {
     /** {@inheritDoc} */
     @Override
     public UUID tableId() {
-        return tableId;
+        return table.tableId();
     }
 
     /** {@inheritDoc} */
@@ -72,6 +84,20 @@ public class HashIndex implements Index<IndexDescriptor> {
     /** {@inheritDoc} */
     @Override
     public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
-        throw new UnsupportedOperationException("Index scan is not implemented yet");
+        return new PublisherAdapter<>(
+                table.scan(partId, tx, id, key, columns),
+                this::convertToTuple
+        );
+    }
+
+    // TODO: fix row conversion, apply projection, upgrade row version if needed.
+    private BinaryTuple convertToTuple(BinaryRow row) {
+        SchemaDescriptor schemaDesc = schemaRegistry.schema(row.schemaVersion());
+
+        ByteBuffer tupleData = BinaryConverter.forRow(schemaDesc).toTuple(row);
+
+        BinaryTupleSchema tupleSchema = null;
+
+        return new BinaryTuple(tupleSchema, tupleData);
     }
 }
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 8b1334bafe..2aaf0c8e61 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -43,6 +43,8 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.manager.Producer;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.StringUtils;
 import org.apache.ignite.lang.ErrorGroups;
@@ -69,14 +71,17 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
 
     /** Prevents double stopping of the component. */
     private final AtomicBoolean stopGuard = new AtomicBoolean();
+    private final TableManager tblManager;
 
     /**
      * Constructor.
      *
      * @param tablesCfg Tables and indexes configuration.
+     * @param tblManager Table manager.
      */
-    public IndexManager(TablesConfiguration tablesCfg) {
+    public IndexManager(TablesConfiguration tablesCfg, TableManager tblManager) {
         this.tablesCfg = Objects.requireNonNull(tablesCfg, "tablesCfg");
+        this.tblManager = tblManager;
     }
 
     /** {@inheritDoc} */
@@ -329,24 +334,27 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
         LOG.trace("Creating local index: name={}, id={}, tableId={}, token={}",
                 tableIndexView.name(), tableIndexView.id(), tableId, causalityToken);
 
-        Index<?> index = newIndex(tableId, tableIndexView);
+        return tblManager.tableAsyncInternal(tableId, false)
+                .thenCompose(tbl -> {
+                    Index<?> index = newIndex(tbl, tableIndexView);
 
-        fireEvent(IndexEvent.CREATE, new IndexEventParameters(causalityToken, index), null);
+                    fireEvent(IndexEvent.CREATE, new IndexEventParameters(causalityToken, index), null);
 
-        return CompletableFuture.completedFuture(null);
+                    return CompletableFuture.completedFuture(null);
+                });
     }
 
-    private Index<?> newIndex(UUID tableId, TableIndexView indexView) {
+    private Index<?> newIndex(TableImpl table, TableIndexView indexView) {
         if (indexView instanceof SortedIndexView) {
             return new SortedIndexImpl(
                     indexView.id(),
-                    tableId,
+                    table,
                     convert((SortedIndexView) indexView)
             );
         } else if (indexView instanceof HashIndexView) {
             return new HashIndex(
                     indexView.id(),
-                    tableId,
+                    table,
                     convert((HashIndexView) indexView)
             );
         }
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
index e68147e4ec..a7a84f993d 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
@@ -17,12 +17,21 @@
 
 package org.apache.ignite.internal.index;
 
+import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.Flow.Publisher;
+import org.apache.ignite.internal.schema.BinaryConverter;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.PublisherAdapter;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -30,20 +39,23 @@ import org.jetbrains.annotations.Nullable;
  */
 public class SortedIndexImpl implements SortedIndex {
     private final UUID id;
-    private final UUID tableId;
+    private final InternalTable table;
     private final SortedIndexDescriptor descriptor;
+    private final SchemaRegistry schemaRegistry;
 
     /**
      * Constructs the sorted index.
      *
      * @param id An identifier of the index.
-     * @param tableId An identifier of the table this index relates to.
+     * @param table A table this index relates to.
      * @param descriptor A descriptor of the index.
      */
-    public SortedIndexImpl(UUID id, UUID tableId, SortedIndexDescriptor descriptor) {
+    public SortedIndexImpl(UUID id, TableImpl table, SortedIndexDescriptor descriptor) {
         this.id = Objects.requireNonNull(id, "id");
-        this.tableId = Objects.requireNonNull(tableId, "tableId");
+        this.table = Objects.requireNonNull(table.internalTable(), "table");
         this.descriptor = Objects.requireNonNull(descriptor, "descriptor");
+
+        schemaRegistry = table.schemaView();
     }
 
     /** {@inheritDoc} */
@@ -55,7 +67,7 @@ public class SortedIndexImpl implements SortedIndex {
     /** {@inheritDoc} */
     @Override
     public UUID tableId() {
-        return tableId;
+        return table.tableId();
     }
 
     /** {@inheritDoc} */
@@ -73,7 +85,7 @@ public class SortedIndexImpl implements SortedIndex {
     /** {@inheritDoc} */
     @Override
     public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
-        throw new UnsupportedOperationException("Index scan is not implemented yet");
+        return scan(partId, tx, key, key, INCLUDE_LEFT, columns); // TODO: Fix flags.
     }
 
     /** {@inheritDoc} */
@@ -86,6 +98,20 @@ public class SortedIndexImpl implements SortedIndex {
             int flags,
             BitSet columnsToInclude
     ) {
-        throw new UnsupportedOperationException("Index scan is not implemented yet");
+        return new PublisherAdapter<>(
+                table.scan(partId, tx, id, leftBound, rightBound, flags, columnsToInclude),
+                this::convertToTuple
+        );
+    }
+
+    // TODO: fix row conversion.
+    private BinaryTuple convertToTuple(BinaryRow row) {
+        SchemaDescriptor schemaDesc = schemaRegistry.schema(row.schemaVersion());
+
+        ByteBuffer tupleData = BinaryConverter.forRow(schemaDesc).toTuple(row);
+
+        BinaryTupleSchema tupleSchema = null;
+
+        return new BinaryTuple(tupleSchema, tupleData);
     }
 }
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index 42f1dfc764..3f485fcfab 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.configuration.tree.TraversableTreeNode;
 import org.apache.ignite.internal.index.event.IndexEvent;
 import org.apache.ignite.internal.index.event.IndexEventParameters;
 import org.apache.ignite.internal.schema.configuration.IndexValidatorImpl;
+import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.lang.ErrorGroups;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -74,6 +75,7 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
 
 /**
  * Test class to verify {@link IndexManager}.
@@ -117,7 +119,9 @@ public class IndexManagerTest {
 
         tablesConfig = confRegistry.getConfiguration(TablesConfiguration.KEY);
 
-        indexManager = new IndexManager(tablesConfig);
+        TableManager tblManager = Mockito.mock(TableManager.class);
+
+        indexManager = new IndexManager(tablesConfig, tblManager);
         indexManager.start();
 
         tablesConfig.tables().change(tableChange -> tableChange.create("tName", chg -> {
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 0af27b5607..2929808994 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -381,7 +381,7 @@ public class IgniteImpl implements Ignite {
                 clock
         );
 
-        indexManager = new IndexManager(tablesConfiguration);
+        indexManager = new IndexManager(tablesConfiguration, distributedTblMgr);
 
         qryEngine = new SqlQueryProcessor(
                 registry,
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index ea1b65ceea..ad2763a586 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -247,7 +247,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
 
         tblManager = mockManagers();
 
-        idxManager = new IndexManager(tblsCfg);
+        idxManager = new IndexManager(tblsCfg, tblManager);
 
         idxManager.start();
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 78ee0022fc..4218ee57c0 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table;
 
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
@@ -24,12 +25,15 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.LockException;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -213,6 +217,36 @@ public interface InternalTable extends AutoCloseable {
      */
     Publisher<BinaryRow> scan(int p, @Nullable InternalTransaction tx);
 
+    /**
+     * Scans given partition index, providing {@link Publisher} that reactively notifies about partition rows.
+     *
+     * @param partId The partition.
+     * @param tx The transaction.
+     * @param indexId Index id.
+     * @param key Key to search.
+     * @param columnsToInclude Row projection. // TODO: Drop parameter or pushdown converter with the schema registry.
+     * @return {@link Publisher} that reactively notifies about partition rows.
+     */
+    default Publisher<BinaryRow> scan(int partId, @Nullable InternalTransaction tx, @NotNull UUID indexId, BinaryTuple key,
+            @Nullable BitSet columnsToInclude) {
+        return scan(partId, tx, indexId, key, key, SortedIndexStorage.GREATER_OR_EQUAL, columnsToInclude);
+    }
+
+    /**
+     * Scans given partition index, providing {@link Publisher} that reactively notifies about partition rows.
+     *
+     * @param partId The partition.
+     * @param tx The transaction.
+     * @param indexId Index id.
+     * @param lowerBound Lower search bound.
+     * @param upperBound Upper search bound.
+     * @param flags Control flags. See {@link org.apache.ignite.internal.storage.index.SortedIndexStorage} constants.
+     * @param columnsToInclude Row projection.
+     * @return {@link Publisher} that reactively notifies about partition rows.
+     */
+    Publisher<BinaryRow> scan(int partId, @Nullable InternalTransaction tx, @NotNull UUID indexId, @Nullable BinaryTuple lowerBound,
+            @Nullable BinaryTuple upperBound, int flags, @Nullable BitSet columnsToInclude);
+
     /**
      * Gets a count of partitions of the table.
      *
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 5396321be6..7e21b4f193 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.CompletableFuture.failedFuture;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
@@ -43,7 +44,9 @@ import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissExcepti
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
@@ -304,7 +307,11 @@ public class InternalTableImpl implements InternalTable {
             @NotNull InternalTransaction tx,
             int partId,
             long scanId,
-            int batchSize
+            int batchSize,
+            @Nullable UUID indexId,
+            @Nullable BinaryTuple lowerBound,
+            @Nullable BinaryTuple upperBound,
+            int flags
     ) {
         String partGroupId = partitionMap.get(partId).groupId();
 
@@ -316,6 +323,10 @@ public class InternalTableImpl implements InternalTable {
                 .groupId(partGroupId)
                 .transactionId(tx.id())
                 .scanId(scanId)
+                .indexToUse(indexId)
+                .lowerBound(lowerBound)
+                .upperBound(upperBound)
+                .flags(flags)
                 .batchSize(batchSize)
                 .timestamp(clock.now());
 
@@ -351,7 +362,7 @@ public class InternalTableImpl implements InternalTable {
             Function<Long, ReplicaRequest> requestFunction,
             int attempts
     ) {
-        CompletableFuture<R> result = new CompletableFuture();
+        CompletableFuture<R> result = new CompletableFuture<>();
 
         enlist(partId, tx).<R>thenCompose(
                         primaryReplicaAndTerm -> {
@@ -684,23 +695,37 @@ public class InternalTableImpl implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public Publisher<BinaryRow> scan(int p, @Nullable InternalTransaction tx) {
-        if (p < 0 || p >= partitions) {
-            throw new IllegalArgumentException(
-                    IgniteStringFormatter.format(
-                            "Invalid partition [partition={}, minValue={}, maxValue={}].",
-                            p,
-                            0,
-                            partitions - 1
-                    )
-            );
-        }
+        ensureValidPartition(p);
+
+        final boolean implicit = tx == null;
+
+        final InternalTransaction tx0 = implicit ? txManager.begin() : tx;
+
+        return new PartitionScanPublisher(
+                (scanId, batchSize) -> enlistCursorInTx(tx0, p, scanId, batchSize, null, null, null,
+                        SortedIndexStorage.GREATER_OR_EQUAL), fut -> postEnlist(fut, implicit, tx0)
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Publisher<BinaryRow> scan(int partId, @Nullable InternalTransaction tx, @NotNull UUID indexId, BinaryTuple key,
+            @Nullable BitSet columnsToInclude) {
+        return scan(partId, tx, indexId, key, key, SortedIndexStorage.GREATER_OR_EQUAL, columnsToInclude);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Publisher<BinaryRow> scan(int partId, @Nullable InternalTransaction tx, @NotNull UUID indexId, @Nullable BinaryTuple lowerBound,
+            @Nullable BinaryTuple upperBound, int flags, @Nullable BitSet columnsToInclude) {
+        ensureValidPartition(partId);
 
         final boolean implicit = tx == null;
 
         final InternalTransaction tx0 = implicit ? txManager.begin() : tx;
 
         return new PartitionScanPublisher(
-                (scanId, batchSize) -> enlistCursorInTx(tx0, p, scanId, batchSize),
+                (scanId, batchSize) -> enlistCursorInTx(tx0, partId, scanId, batchSize, indexId, lowerBound, upperBound, flags),
                 fut -> postEnlist(fut, implicit, tx0)
         );
     }
@@ -800,6 +825,25 @@ public class InternalTableImpl implements InternalTable {
         return (partId < 0) ? -partId : partId;
     }
 
+    /**
+     * Ensures partition id is valid.
+     *
+     * @param partId Partition.
+     * @throws IllegalArgumentException If partition id is invalid.
+     */
+    private void ensureValidPartition(int partId) {
+        if (partId < 0 || partId >= partitions) {
+            throw new IllegalArgumentException(
+                    IgniteStringFormatter.format(
+                            "Invalid partition [partition={}, minValue={}, maxValue={}].",
+                            partId,
+                            0,
+                            partitions - 1
+                    )
+            );
+        }
+    }
+
     /**
      * TODO asch keep the same order as for keys Collects multirow responses from multiple futures into a single collection IGNITE-16004.
      *