You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2022/10/11 13:28:30 UTC

[ignite-3] branch main updated: IGNITE-17259 Populate ReplicaListener with handlers for RO requests (#1165)

This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e75022ec99 IGNITE-17259 Populate ReplicaListener with handlers for RO requests (#1165)
e75022ec99 is described below

commit e75022ec99689a08a042e2ecedec6e4211dbfecf
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Tue Oct 11 16:28:25 2022 +0300

    IGNITE-17259 Populate ReplicaListener with handlers for RO requests (#1165)
---
 .../internal/table/distributed/TableManager.java   |  34 +++----
 .../table/distributed/TableMessageGroup.java       |  26 +++++-
 ...st.java => ReadOnlyMultiRowReplicaRequest.java} |  25 ++----
 ...> ReadOnlyScanRetrieveBatchReplicaRequest.java} |  23 +----
 ...t.java => ReadOnlySingleRowReplicaRequest.java} |  25 ++----
 .../request/ScanRetrieveBatchReplicaRequest.java   |  56 ++++++++++--
 .../replicator/PartitionReplicaListener.java       | 100 ++++++++++++++++++++-
 7 files changed, 200 insertions(+), 89 deletions(-)

diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 78cdc720e5..9d10eb31bb 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1668,15 +1668,15 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                         return true;
                     }
 
-                    int part = extractPartitionNumber(pendingAssignmentsWatchEvent.key());
+                    int partId = extractPartitionNumber(pendingAssignmentsWatchEvent.key());
                     UUID tblId = extractTableId(pendingAssignmentsWatchEvent.key(), PENDING_ASSIGNMENTS_PREFIX);
 
-                    String partId = partitionRaftGroupName(tblId, part);
+                    String grpId = partitionRaftGroupName(tblId, partId);
 
                     // Assignments of the pending rebalance that we received through the meta storage watch mechanism.
                     Set<ClusterNode> newPeers = ByteUtils.fromBytes(pendingAssignmentsWatchEvent.value());
 
-                    var pendingAssignments = metaStorageMgr.get(pendingPartAssignmentsKey(partId)).join();
+                    var pendingAssignments = metaStorageMgr.get(pendingPartAssignmentsKey(grpId)).join();
 
                     assert pendingAssignmentsWatchEvent.revision() <= pendingAssignments.revision()
                             : "Meta Storage watch cannot notify about an event with the revision that is more than the actual revision.";
@@ -1691,12 +1691,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                     + tbl.internalTable().storage().getClass().getName();
 
                     // Stable assignments from the meta store, which revision is bounded by the current pending event.
-                    byte[] stableAssignments = metaStorageMgr.get(stablePartAssignmentsKey(partId),
+                    byte[] stableAssignments = metaStorageMgr.get(stablePartAssignmentsKey(grpId),
                             pendingAssignmentsWatchEvent.revision()).join().value();
 
                     Set<ClusterNode> assignments = stableAssignments == null
                             // This is for the case when the first rebalance occurs.
-                            ? ((List<Set<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(part)
+                            ? ((List<Set<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(partId)
                             : ByteUtils.fromBytes(stableAssignments);
 
                     ClusterNode localMember = raftMgr.topologyService().localMember();
@@ -1710,10 +1710,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                     try {
                         LOG.info("Received update on pending assignments. Check if new raft group should be started"
                                         + " [key={}, partition={}, table={}, localMemberAddress={}]",
-                                pendingAssignmentsWatchEvent.key(), part, tbl.name(), localMember.address());
+                                pendingAssignmentsWatchEvent.key(), partId, tbl.name(), localMember.address());
 
                         if (raftMgr.shouldHaveRaftGroupLocally(deltaPeers)) {
-                            MvPartitionStorage partitionStorage = tbl.internalTable().storage().getOrCreateMvPartition(part);
+                            MvPartitionStorage partitionStorage = tbl.internalTable().storage().getOrCreateMvPartition(partId);
 
                             RaftGroupOptions groupOptions = groupOptionsForPartition(
                                     tbl.internalTable(),
@@ -1724,7 +1724,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                             RaftGroupListener raftGrpLsnr = new PartitionListener(
                                     partitionStorage,
-                                    tbl.internalTable().txStateStorage().getOrCreateTxStateStorage(part),
+                                    tbl.internalTable().txStateStorage().getOrCreateTxStateStorage(partId),
                                     txManager,
                                     primaryIndex
                             );
@@ -1732,16 +1732,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                             RaftGroupEventsListener raftGrpEvtsLsnr = new RebalanceRaftGroupEventsListener(
                                     metaStorageMgr,
                                     tblCfg,
+                                    grpId,
                                     partId,
-                                    part,
                                     busyLock,
-                                    movePartition(() -> tbl.internalTable().partitionRaftGroupService(part)),
+                                    movePartition(() -> tbl.internalTable().partitionRaftGroupService(partId)),
                                     TableManager.this::calculateAssignments,
                                     rebalanceScheduler
                             );
 
                             raftMgr.startRaftGroupNode(
-                                    partId,
+                                    grpId,
                                     assignments,
                                     raftGrpLsnr,
                                     raftGrpEvtsLsnr,
@@ -1750,16 +1750,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                         }
 
                         if (replicaMgr.shouldHaveReplicationGroupLocally(deltaPeers)) {
-                            MvPartitionStorage partitionStorage = tbl.internalTable().storage().getOrCreateMvPartition(part);
+                            MvPartitionStorage partitionStorage = tbl.internalTable().storage().getOrCreateMvPartition(partId);
 
-                            replicaMgr.startReplica(partId,
+                            replicaMgr.startReplica(grpId,
                                     new PartitionReplicaListener(
                                             partitionStorage,
-                                            tbl.internalTable().partitionRaftGroupService(part),
+                                            tbl.internalTable().partitionRaftGroupService(partId),
                                             txManager,
                                             lockMgr,
-                                            part,
                                             partId,
+                                            grpId,
                                             tblId,
                                             primaryIndex,
                                             clock
@@ -1778,7 +1778,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                     var newNodes = newPeers.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
 
-                    RaftGroupService partGrpSvc = tbl.internalTable().partitionRaftGroupService(part);
+                    RaftGroupService partGrpSvc = tbl.internalTable().partitionRaftGroupService(partId);
 
                     IgniteBiTuple<Peer, Long> leaderWithTerm = partGrpSvc.refreshAndGetLeaderWithTerm().join();
 
@@ -1786,7 +1786,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                     if (localMember.address().equals(leaderWithTerm.get1().address())) {
                         LOG.info("Current node={} is the leader of partition raft group={}. "
                                         + "Initiate rebalance process for partition={}, table={}",
-                                localMember.address(), partId, part, tbl.name());
+                                localMember.address(), grpId, partId, tbl.name());
 
                         partGrpSvc.changePeersAsync(newNodes, leaderWithTerm.get2()).join();
                     }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index f89f6fb6fd..3eb11ed25f 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -26,11 +26,14 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanCloseReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSwapRowReplicaRequest;
-import org.apache.ignite.internal.table.distributed.replication.request.ScanCloseReplicaRequest;
-import org.apache.ignite.internal.table.distributed.replication.request.ScanRetrieveBatchReplicaRequest;
 import org.apache.ignite.network.annotations.MessageGroup;
 
 /**
@@ -54,12 +57,12 @@ public interface TableMessageGroup {
     short RW_DUAL_ROW_REPLICA_REQUEST = 2;
 
     /**
-     * Message type for {@link ScanRetrieveBatchReplicaRequest}.
+     * Message type for {@link ReadWriteScanRetrieveBatchReplicaRequest}.
      */
     short RW_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST = 3;
 
     /**
-     * Message type for {@link ScanCloseReplicaRequest}.
+     * Message type for {@link ReadWriteScanCloseReplicaRequest}.
      */
     short RW_SCAN_CLOSE_REPLICA_REQUEST = 4;
 
@@ -73,6 +76,21 @@ public interface TableMessageGroup {
      */
     short HAS_DATA_RESPONSE = 6;
 
+    /**
+     * Message type for {@link ReadOnlySingleRowReplicaRequest}.
+     */
+    short RO_SINGLE_ROW_REPLICA_REQUEST = 7;
+
+    /**
+     * Message type for {@link ReadOnlyMultiRowReplicaRequest}.
+     */
+    short RO_MULTI_ROW_REPLICA_REQUEST = 8;
+
+    /**
+     * Message type for {@link ReadOnlyScanRetrieveBatchReplicaRequest}.
+     */
+    short RO_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST = 9;
+
     /**
      * Message type for {@link SnapshotMetaRequest}.
      */
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyMultiRowReplicaRequest.java
similarity index 54%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyMultiRowReplicaRequest.java
index 4c147793af..ac377e98bf 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyMultiRowReplicaRequest.java
@@ -17,27 +17,12 @@
 
 package org.apache.ignite.internal.table.distributed.replication.request;
 
-import java.util.function.Predicate;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Scan retrieve batch replica request.
+ * Read only multi row replica request.
  */
-public interface ScanRetrieveBatchReplicaRequest extends ReplicaRequest {
-    /** Batch size. */
-    int batchSize();
-
-    /** The id uniquely determines a cursor for the transaction. */
-    long scanId();
-
-    /**
-     * Gets a scan row filter. The filter has a sense only for the first request, for the second one and the followings the field is
-     * ignored.
-     *
-     * @return Row filter predicate.
-     */
-    @Marshallable
-    Predicate<BinaryRow> rowFilter();
+@Transferable(TableMessageGroup.RO_MULTI_ROW_REPLICA_REQUEST)
+public interface ReadOnlyMultiRowReplicaRequest extends MultipleRowReplicaRequest, ReadOnlyReplicaRequest {
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyScanRetrieveBatchReplicaRequest.java
similarity index 57%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyScanRetrieveBatchReplicaRequest.java
index 4c147793af..37c1d37239 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyScanRetrieveBatchReplicaRequest.java
@@ -17,27 +17,12 @@
 
 package org.apache.ignite.internal.table.distributed.replication.request;
 
-import java.util.function.Predicate;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
 
 /**
  * Scan retrieve batch replica request.
  */
-public interface ScanRetrieveBatchReplicaRequest extends ReplicaRequest {
-    /** Batch size. */
-    int batchSize();
-
-    /** The id uniquely determines a cursor for the transaction. */
-    long scanId();
-
-    /**
-     * Gets a scan row filter. The filter has a sense only for the first request, for the second one and the followings the field is
-     * ignored.
-     *
-     * @return Row filter predicate.
-     */
-    @Marshallable
-    Predicate<BinaryRow> rowFilter();
+@Transferable(TableMessageGroup.RO_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST)
+public interface ReadOnlyScanRetrieveBatchReplicaRequest extends ScanRetrieveBatchReplicaRequest, ReadOnlyReplicaRequest {
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlySingleRowReplicaRequest.java
similarity index 54%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlySingleRowReplicaRequest.java
index 4c147793af..6589630d9d 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlySingleRowReplicaRequest.java
@@ -17,27 +17,12 @@
 
 package org.apache.ignite.internal.table.distributed.replication.request;
 
-import java.util.function.Predicate;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Scan retrieve batch replica request.
+ * Read only single row replica request.
  */
-public interface ScanRetrieveBatchReplicaRequest extends ReplicaRequest {
-    /** Batch size. */
-    int batchSize();
-
-    /** The id uniquely determines a cursor for the transaction. */
-    long scanId();
-
-    /**
-     * Gets a scan row filter. The filter has a sense only for the first request, for the second one and the followings the field is
-     * ignored.
-     *
-     * @return Row filter predicate.
-     */
-    @Marshallable
-    Predicate<BinaryRow> rowFilter();
+@Transferable(TableMessageGroup.RO_SINGLE_ROW_REPLICA_REQUEST)
+public interface ReadOnlySingleRowReplicaRequest extends SingleRowReplicaRequest, ReadOnlyReplicaRequest {
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
index 4c147793af..70d53c159d 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.table.distributed.replication.request;
 
-import java.util.function.Predicate;
+import java.util.BitSet;
+import java.util.UUID;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.network.annotations.Marshallable;
 
 /**
@@ -33,11 +35,53 @@ public interface ScanRetrieveBatchReplicaRequest extends ReplicaRequest {
     long scanId();
 
     /**
-     * Gets a scan row filter. The filter has a sense only for the first request, for the second one and the followings the field is
-     * ignored.
+     * Gets an index to use fot the retrieve request.
      *
-     * @return Row filter predicate.
+     * @return Index id.
      */
     @Marshallable
-    Predicate<BinaryRow> rowFilter();
+    UUID indexToUse();
+
+    /**
+     * Gets a key which is used for exact comparison in the index.
+     *
+     * @return Key to search.
+     */
+    @Marshallable
+    BinaryTuple exactKey();
+
+    /**
+     * Gets a lower bound to choose entries from {@link SortedIndexStorage}. Exclusivity is controlled by a {@link
+     * SortedIndexStorage#GREATER_OR_EQUAL} or {@link SortedIndexStorage#GREATER} flag. {@code null} means unbounded.
+     *
+     * @return lower bound.
+     */
+    @Marshallable
+    BinaryTuple lowerBound();
+
+    /**
+     * Gets an upper bound to choose entries from {@link SortedIndexStorage}. Upper bound. Exclusivity is controlled by a {@link
+     * SortedIndexStorage#LESS} or {@link SortedIndexStorage#LESS_OR_EQUAL} flag. {@code null} means unbounded.
+     *
+     * @return upper bound.
+     */
+    @Marshallable
+    BinaryTuple upperBound();
+
+    /**
+     * Gets control flags for {@link SortedIndexStorage}. {@link SortedIndexStorage#GREATER} | {@link SortedIndexStorage#LESS} by default.
+     * Other available values are {@link SortedIndexStorage#GREATER_OR_EQUAL}, {@link SortedIndexStorage#LESS_OR_EQUAL}.
+     * TODO: IGNITE-17748 Refresh the summary when the meaning changes, after the issue will be implemented.
+     *
+     * @return Flags to determine a scan order.
+     */
+    int flags();
+
+    /**
+     * Gets bitset to include columns.
+     *
+     * @return Bitset to include columns.
+     */
+    @Marshallable
+    BitSet columnsToInclude();
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 1aa567cec5..cf429c2bfd 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -51,12 +51,16 @@ import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanCloseReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSwapRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.tx.Lock;
 import org.apache.ignite.internal.tx.LockKey;
 import org.apache.ignite.internal.tx.LockManager;
@@ -184,12 +188,100 @@ public class PartitionReplicaListener implements ReplicaListener {
                         return processTxFinishAction((TxFinishReplicaRequest) request);
                     } else if (request instanceof TxCleanupReplicaRequest) {
                         return processTxCleanupAction((TxCleanupReplicaRequest) request);
+                    } else if (request instanceof ReadOnlySingleRowReplicaRequest) {
+                        return processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request);
+                    } else if (request instanceof ReadOnlyMultiRowReplicaRequest) {
+                        return processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request);
+                    } else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) {
+                        return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request);
                     } else {
                         throw new UnsupportedReplicaRequestException(request.getClass());
                     }
                 });
     }
 
+    /**
+     * Processes retrieve batch for read only transaction.
+     *
+     * @param request Read only retrieve batch request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest request) {
+        UUID txId = request.transactionId();
+        int batchCount = request.batchSize();
+
+        IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+        ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
+
+        //TODO: IGNITE-17849 Remove this always true filter after the storage API will be changed.
+        PartitionTimestampCursor cursor = cursors.computeIfAbsent(cursorId,
+                id -> mvDataStorage.scan(row -> true, HybridTimestamp.MAX_VALUE));
+
+        while (batchRows.size() < batchCount && cursor.hasNext()) {
+            BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), null);
+
+            if (resolvedReadResult != null) {
+                batchRows.add(resolvedReadResult);
+            }
+        }
+
+        return CompletableFuture.completedFuture(batchRows);
+    }
+
+    /**
+     * Processes single entry request for read only transaction.
+     *
+     * @param request Read only single entry request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) {
+        ByteBuffer searchKey = request.binaryRow().keySlice();
+
+        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+        if (request.requestType() !=  RequestType.RO_GET) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    IgniteStringFormatter.format("Unknown single request [actionType={}]", request.requestType()));
+        }
+
+        //TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
+        RowId rowId = rowIdByKey(indexId, searchKey);
+
+        BinaryRow result = rowId != null ? resolveReadResult(mvDataStorage.read(rowId, request.timestamp()), null) : null;
+
+        return CompletableFuture.completedFuture(result);
+    }
+
+    /**
+     * Processes multiple entries request for read only transaction.
+     *
+     * @param request Read only multiple entries request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request) {
+        Collection<ByteBuffer> keyRows = request.binaryRows().stream().map(br -> br.keySlice()).collect(
+                Collectors.toList());
+
+        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+        if (request.requestType() !=  RequestType.RO_GET_ALL) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    IgniteStringFormatter.format("Unknown single request [actionType={}]", request.requestType()));
+        }
+
+        ArrayList<BinaryRow> result = new ArrayList<>(keyRows.size());
+
+        for (ByteBuffer searchKey : keyRows) {
+            //TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
+            RowId rowId = rowIdByKey(indexId, searchKey);
+
+            result.add(rowId != null ? resolveReadResult(mvDataStorage.read(rowId, request.timestamp()), null) : null);
+        }
+
+        return CompletableFuture.completedFuture(result);
+    }
+
     /**
      * Close all cursors connected with a transaction.
      *
@@ -264,11 +356,13 @@ public class PartitionReplicaListener implements ReplicaListener {
         return lockManager.acquire(txId, new LockKey(tableId), LockMode.S).thenCompose(tblLock -> {
             ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
 
-            PartitionTimestampCursor cursor = cursors.computeIfAbsent(cursorId, id -> mvDataStorage.scan(request.rowFilter(),
-                    HybridTimestamp.MAX_VALUE));
+            //TODO: IGNITE-17849 Remove this always true filter after the storage API will be changed.
+            PartitionTimestampCursor cursor = cursors.computeIfAbsent(cursorId,
+                    id -> mvDataStorage.scan(row -> true, HybridTimestamp.MAX_VALUE));
 
-            for (int i = 0; i < batchCount && cursor.hasNext(); i++) {
+            while (batchRows.size() < batchCount && cursor.hasNext()) {
                 BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), txId);
+
                 if (resolvedReadResult != null) {
                     batchRows.add(resolvedReadResult);
                 }