You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/06/23 11:36:43 UTC

[ignite-3] branch main updated: IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses (#2236)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 683f9a6f20 IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses (#2236)
683f9a6f20 is described below

commit 683f9a6f20613e727be56b08309b95e77147a9f4
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Fri Jun 23 14:36:37 2023 +0300

    IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses (#2236)
---
 .../runner/app/ItTableApiContractTest.java         |  28 +++++
 .../ignite/internal/table/ItRoReadsTest.java       |   4 +-
 .../ItInternalTableReadOnlyOperationsTest.java     |  56 +++++++--
 .../replicator/PartitionReplicaListener.java       |   4 +-
 .../distributed/storage/InternalTableImpl.java     | 127 +++++++++++++--------
 .../table/distributed/storage/RowBatch.java        |  77 +++++++++++++
 .../replication/PartitionReplicaListenerTest.java  |   2 +-
 .../distributed/storage/InternalTableImplTest.java |  76 ++++++++++++
 8 files changed, 315 insertions(+), 59 deletions(-)

diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
index 05f86851f8..d123b28903 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
@@ -17,16 +17,21 @@
 
 package org.apache.ignite.internal.runner.app;
 
+import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
 import static org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter.convert;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
 import org.apache.ignite.internal.schema.testutils.definition.ColumnType;
@@ -246,6 +251,29 @@ public class ItTableApiContractTest extends ClusterPerClassIntegrationTest {
         assertThrows(TableAlreadyExistsException.class, () -> futureResult(tableFut2));
     }
 
+    @Test
+    public void testGetAll() {
+        RecordView<Tuple> tbl = ignite.tables().table(TABLE_NAME).recordView();
+
+        var recs = IntStream.range(0, 5)
+                .mapToObj(i -> Tuple.create().set("name", "id_" + i * 2).set("balance", i * 2))
+                .collect(toList());
+
+        tbl.upsertAll(null, recs);
+
+        var keys = IntStream.range(0, 10)
+                .mapToObj(i -> Tuple.create().set("name", "id_" + i))
+                .collect(toList());
+
+        List<Tuple> res = (List<Tuple>) tbl.getAll(null, keys);
+
+        // TODO: IGNITE-19693 should be: "id_0", null, "id_2", null, "id_4", null, "id_6", null, "id_8", null
+        assertThat(
+                res.stream().map(tuple -> tuple.stringValue(0)).collect(toList()),
+                contains("id_0", "id_2", "id_4", "id_6", "id_8")
+        );
+    }
+
     private TableManager tableManager() {
         return (TableManager) ignite.tables();
     }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
index 7e2e74a5ae..01d4ab0183 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
@@ -327,7 +327,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
 
         Collection<BinaryRow> res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get();
 
-        assertEquals(res.size(), 0);
+        assertEquals(res.size(), 3);
 
         node.transactions().runInTransaction(txs -> {
             for (int i = 0; i < 15; i++) {
@@ -362,7 +362,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
 
         Collection<BinaryRow> res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get();
 
-        assertEquals(res.size(), 0);
+        assertEquals(res.size(), 3);
 
         populateData(node(), keyValueView, false);
 
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
index 576e1f9a4b..482ab45518 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
@@ -17,10 +17,14 @@
 
 package org.apache.ignite.distributed;
 
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -30,6 +34,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -144,8 +149,36 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
     public void testReadOnlyGetAllNonExistingKeysWithReadTimestamp() {
         mockReadOnlyMultiRowRequest();
 
-        assertEquals(0,
-                internalTbl.getAll(Collections.singleton(createKeyRow(0)), CLOCK.now(), mock(ClusterNode.class)).join().size()
+        assertThat(
+                internalTbl.getAll(List.of(createKeyRow(0)), CLOCK.now(), mock(ClusterNode.class)),
+                willBe(contains(nullValue()))
+        );
+    }
+
+    @Test
+    public void testReadOnlyGetAllMixExistingKeysWithTx() {
+        mockReadOnlyMultiRowRequest();
+
+        assertThat(
+                internalTbl.getAll(
+                        List.of(createKeyRow(0), createKeyRow(1), createKeyRow(2), createKeyRow(3)),
+                        readOnlyTx
+                ),
+                willBe(contains(nullValue(), equalTo(ROW_1), equalTo(ROW_2), nullValue()))
+        );
+    }
+
+    @Test
+    public void testReadOnlyGetAllMixExistingKeysWithReadTimestamp() {
+        mockReadOnlyMultiRowRequest();
+
+        assertThat(
+                internalTbl.getAll(
+                        List.of(createKeyRow(0), createKeyRow(1), createKeyRow(2), createKeyRow(3)),
+                        CLOCK.now(),
+                        mock(ClusterNode.class)
+                ),
+                willBe(contains(nullValue(), equalTo(ROW_1), equalTo(ROW_2), nullValue()))
         );
     }
 
@@ -153,8 +186,9 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
     public void testReadOnlyGetAllNonExistingKeysWithTx() {
         mockReadOnlyMultiRowRequest();
 
-        assertEquals(0,
-                internalTbl.getAll(Collections.singleton(createKeyRow(0)), readOnlyTx).join().size()
+        assertThat(
+                internalTbl.getAll(List.of(createKeyRow(0)), readOnlyTx),
+                willBe(contains(nullValue()))
         );
     }
 
@@ -268,16 +302,22 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
         List<BinaryRow> rowStore = List.of(ROW_1, ROW_2);
 
         when(replicaService.invoke(any(ClusterNode.class), any(ReadOnlyMultiRowReplicaRequest.class))).thenAnswer(args -> {
-            List<BinaryRow> result = new ArrayList<>();
+            Collection<BinaryRow> requestedRows = args.getArgument(1, ReadOnlyMultiRowReplicaRequest.class).binaryRows();
 
-            for (BinaryRow row : rowStore) {
-                for (BinaryRow searchRow : args.getArgument(1, ReadOnlyMultiRowReplicaRequest.class).binaryRows()) {
+            List<BinaryRow> result = new ArrayList<>(requestedRows.size());
+
+            for (BinaryRow searchRow : requestedRows) {
+                BinaryRow resultRow = null;
+
+                for (BinaryRow row : rowStore) {
                     if (KEY_EXTRACTOR.apply(row).byteBuffer().equals(KEY_EXTRACTOR.apply(searchRow).byteBuffer())) {
-                        result.add(row);
+                        resultRow = row;
 
                         break;
                     }
                 }
+
+                result.add(resultRow);
             }
 
             return CompletableFuture.completedFuture(result);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index b19116c190..367b9aded2 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -575,9 +575,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                 for (CompletableFuture<BinaryRow> resolutionFut : resolutionFuts) {
                     BinaryRow resolvedReadResult = resolutionFut.join();
 
-                    if (resolvedReadResult != null) {
-                        result.add(resolvedReadResult);
-                    }
+                    result.add(resolvedReadResult);
                 }
 
                 return result;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 36c68c3b93..06724d110b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed.storage;
 import static it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.table.distributed.storage.RowBatch.allResultFutures;
 import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
@@ -33,6 +34,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.net.ConnectException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
@@ -283,7 +285,7 @@ public class InternalTableImpl implements InternalTable {
             Collection<BinaryRowEx> keyRows,
             @Nullable InternalTransaction tx,
             IgniteFiveFunction<TablePartitionId, Collection<BinaryRow>, InternalTransaction, ReplicationGroupId, Long, ReplicaRequest> op,
-            Function<CompletableFuture<Object>[], CompletableFuture<T>> reducer
+            Function<Collection<RowBatch>, CompletableFuture<T>> reducer
     ) {
         // Check whether proposed tx is read-only. Complete future exceptionally if true.
         // Attempting to enlist a read-only in a read-write transaction does not corrupt the transaction itself, thus read-write transaction
@@ -306,14 +308,13 @@ public class InternalTableImpl implements InternalTable {
 
         final InternalTransaction tx0 = implicit ? txManager.begin() : tx;
 
-        Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = mapRowsToPartitions(keyRows);
+        Int2ObjectMap<RowBatch> rowBatchByPartitionId = toRowBatchByPartitionId(keyRows);
 
-        CompletableFuture<Object>[] futures = new CompletableFuture[keyRowsByPartition.size()];
+        for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+            int partitionId = partitionRowBatch.getIntKey();
+            RowBatch rowBatch = partitionRowBatch.getValue();
 
-        int batchNum = 0;
-
-        for (Int2ObjectOpenHashMap.Entry<List<BinaryRow>> partToRows : keyRowsByPartition.int2ObjectEntrySet()) {
-            TablePartitionId partGroupId = new TablePartitionId(tableId, partToRows.getIntKey());
+            TablePartitionId partGroupId = new TablePartitionId(tableId, partitionId);
 
             IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm = tx0.enlistedNodeAndTerm(partGroupId);
 
@@ -322,7 +323,7 @@ public class InternalTableImpl implements InternalTable {
             if (primaryReplicaAndTerm != null) {
                 TablePartitionId commitPart = tx.commitPartition();
 
-                ReplicaRequest request = op.apply(commitPart, partToRows.getValue(), tx0, partGroupId, primaryReplicaAndTerm.get2());
+                ReplicaRequest request = op.apply(commitPart, rowBatch.requestedRows, tx0, partGroupId, primaryReplicaAndTerm.get2());
 
                 try {
                     fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(), request);
@@ -334,16 +335,16 @@ public class InternalTableImpl implements InternalTable {
             } else {
                 fut = enlistWithRetry(
                         tx0,
-                        partToRows.getIntKey(),
-                        (commitPart, term) -> op.apply(commitPart, partToRows.getValue(), tx0, partGroupId, term),
+                        partitionId,
+                        (commitPart, term) -> op.apply(commitPart, rowBatch.requestedRows, tx0, partGroupId, term),
                         ATTEMPTS_TO_ENLIST_PARTITION
                 );
             }
 
-            futures[batchNum++] = fut;
+            rowBatch.resultFuture = fut;
         }
 
-        CompletableFuture<T> fut = reducer.apply(futures);
+        CompletableFuture<T> fut = reducer.apply(rowBatchByPartitionId.values());
 
         return postEnlist(fut, implicit, tx0);
     }
@@ -581,7 +582,8 @@ public class InternalTableImpl implements InternalTable {
                             .requestType(RequestType.RW_GET_ALL)
                             .timestampLong(clock.nowLong())
                             .build(),
-                    this::collectMultiRowsResponses);
+                    InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder
+            );
         }
     }
 
@@ -592,27 +594,23 @@ public class InternalTableImpl implements InternalTable {
             HybridTimestamp readTimestamp,
             ClusterNode recipientNode
     ) {
-        Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = mapRowsToPartitions(keyRows);
-
-        CompletableFuture<Object>[] futures = new CompletableFuture[keyRowsByPartition.size()];
-
-        int batchNum = 0;
+        Int2ObjectMap<RowBatch> rowBatchByPartitionId = toRowBatchByPartitionId(keyRows);
 
-        for (Int2ObjectOpenHashMap.Entry<List<BinaryRow>> partToRows : keyRowsByPartition.int2ObjectEntrySet()) {
-            ReplicationGroupId partGroupId = raftGroupServiceByPartitionId.get(partToRows.getIntKey()).groupId();
+        for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+            ReplicationGroupId partGroupId = raftGroupServiceByPartitionId.get(partitionRowBatch.getIntKey()).groupId();
 
             CompletableFuture<Object> fut = replicaSvc.invoke(recipientNode, tableMessagesFactory.readOnlyMultiRowReplicaRequest()
                     .groupId(partGroupId)
-                    .binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
+                    .binaryRowsBytes(serializeBinaryRows(partitionRowBatch.getValue().requestedRows))
                     .requestType(RequestType.RO_GET_ALL)
                     .readTimestampLong(readTimestamp.longValue())
                     .build()
             );
 
-            futures[batchNum++] = fut;
+            partitionRowBatch.getValue().resultFuture = fut;
         }
 
-        return collectMultiRowsResponses(futures);
+        return collectMultiRowsResponsesWithRestoreOrder(rowBatchByPartitionId.values());
     }
 
     private static List<ByteBuffer> serializeBinaryRows(Collection<? extends BinaryRow> rows) {
@@ -649,7 +647,8 @@ public class InternalTableImpl implements InternalTable {
                 rows,
                 tx,
                 this::upsertAllInternal,
-                CompletableFuture::allOf);
+                RowBatch::allResultFutures
+        );
     }
 
     /** {@inheritDoc} */
@@ -719,7 +718,8 @@ public class InternalTableImpl implements InternalTable {
                         .requestType(RequestType.RW_INSERT_ALL)
                         .timestampLong(clock.nowLong())
                         .build(),
-                this::collectMultiRowsResponses);
+                InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder
+        );
     }
 
     /** {@inheritDoc} */
@@ -846,7 +846,8 @@ public class InternalTableImpl implements InternalTable {
                         .requestType(RequestType.RW_DELETE_ALL)
                         .timestampLong(clock.nowLong())
                         .build(),
-                this::collectMultiRowsResponses);
+                InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder
+        );
     }
 
     /** {@inheritDoc} */
@@ -867,7 +868,8 @@ public class InternalTableImpl implements InternalTable {
                         .requestType(RequestType.RW_DELETE_EXACT_ALL)
                         .timestampLong(clock.nowLong())
                         .build(),
-                this::collectMultiRowsResponses);
+                InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder
+        );
     }
 
     @Override
@@ -1077,19 +1079,20 @@ public class InternalTableImpl implements InternalTable {
     }
 
     /**
-     * Map rows to partitions.
+     * Creates batches of rows for processing, grouped by partition ID.
      *
      * @param rows Rows.
-     * @return Partition -%gt; rows mapping.
      */
-    private Int2ObjectOpenHashMap<List<BinaryRow>> mapRowsToPartitions(Collection<BinaryRowEx> rows) {
-        Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = new Int2ObjectOpenHashMap<>();
+    Int2ObjectMap<RowBatch> toRowBatchByPartitionId(Collection<BinaryRowEx> rows) {
+        Int2ObjectMap<RowBatch> rowBatchByPartitionId = new Int2ObjectOpenHashMap<>();
+
+        int i = 0;
 
-        for (BinaryRowEx keyRow : rows) {
-            keyRowsByPartition.computeIfAbsent(partitionId(keyRow), k -> new ArrayList<>()).add(keyRow);
+        for (BinaryRowEx row : rows) {
+            rowBatchByPartitionId.computeIfAbsent(partitionId(row), partitionId -> new RowBatch()).add(row, i++);
         }
 
-        return keyRowsByPartition;
+        return rowBatchByPartitionId;
     }
 
     /** {@inheritDoc} */
@@ -1206,25 +1209,59 @@ public class InternalTableImpl implements InternalTable {
     }
 
     /**
-     * TODO asch keep the same order as for keys Collects multirow responses from multiple futures into a single collection IGNITE-16004.
+     * Gathers the result of batch processing into a single resulting collection of rows.
      *
-     * @param futs Futures.
-     * @return Row collection.
+     * @param rowBatches Row batches.
+     * @return Future of collecting results.
      */
-    private CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponses(CompletableFuture<Object>[] futs) {
-        return CompletableFuture.allOf(futs)
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithoutRestoreOrder(Collection<RowBatch> rowBatches) {
+        return allResultFutures(rowBatches)
                 .thenApply(response -> {
-                    Collection<BinaryRow> list = new ArrayList<>(futs.length);
+                    var result = new ArrayList<BinaryRow>(rowBatches.size());
+
+                    for (RowBatch rowBatch : rowBatches) {
+                        Collection<BinaryRow> batchResult = (Collection<BinaryRow>) rowBatch.getCompletedResult();
+
+                        if (batchResult == null) {
+                            continue;
+                        }
+
+                        result.addAll(batchResult);
+                    }
+
+                    return result;
+                });
+    }
+
+    /**
+     * Gathers the result of batch processing into a single resulting collection of rows, restoring order as in the requested collection of
+     * rows.
+     *
+     * @param rowBatches Row batches by partition ID.
+     * @return Future of collecting results.
+     */
+    static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(Collection<RowBatch> rowBatches) {
+        return allResultFutures(rowBatches)
+                .thenApply(response -> {
+                    var result = new BinaryRow[RowBatch.getTotalRequestedRowSize(rowBatches)];
+
+                    for (RowBatch rowBatch : rowBatches) {
+                        Collection<BinaryRow> batchResult = (Collection<BinaryRow>) rowBatch.getCompletedResult();
+
+                        assert batchResult != null;
+
+                        assert batchResult.size() == rowBatch.requestedRows.size() :
+                                "batchResult=" + batchResult.size() + ", requestedRows=" + rowBatch.requestedRows.size();
 
-                    for (CompletableFuture<Object> future : futs) {
-                        Collection<BinaryRow> values = (Collection<BinaryRow>) future.join();
+                        int i = 0;
 
-                        if (values != null) {
-                            list.addAll(values);
+                        for (BinaryRow resultRow : batchResult) {
+                            result[rowBatch.getOriginalRowIndex(i++)] = resultRow;
                         }
                     }
 
-                    return list;
+                    // Use Arrays#asList to avoid copying the array.
+                    return Arrays.asList(result);
                 });
     }
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/RowBatch.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/RowBatch.java
new file mode 100644
index 0000000000..09214d860a
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/RowBatch.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.storage;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Batch of binary rows from the original collection of binary rows for processing, preserving the order from the original collection.
+ *
+ * <p>NOTE: Not thread-safe.
+ */
+class RowBatch {
+    /** Batch of rows from the original collection of rows. */
+    final List<BinaryRow> requestedRows = new ArrayList<>();
+
+    /** Order of the rows from the {@link #requestedRows} in the original row collection. */
+    final IntList originalRowOrder = new IntArrayList();
+
+    /**
+     * Future of the result of processing the {@link #requestedRows}, {@code null} if not set and may return {@code null}.
+     */
+    @Nullable CompletableFuture<Object> resultFuture;
+
+    void add(BinaryRow row, int originalIndex) {
+        requestedRows.add(row);
+        originalRowOrder.add(originalIndex);
+    }
+
+    @Nullable Object getCompletedResult() {
+        CompletableFuture<Object> resultFuture = this.resultFuture;
+
+        assert resultFuture != null;
+        assert resultFuture.isDone();
+
+        return resultFuture.join();
+    }
+
+    int getOriginalRowIndex(int resultRowIndex) {
+        return originalRowOrder.getInt(resultRowIndex);
+    }
+
+    static CompletableFuture<Void> allResultFutures(Collection<RowBatch> batches) {
+        return CompletableFuture.allOf(batches.stream().map(rowBatch -> rowBatch.resultFuture).toArray(CompletableFuture[]::new));
+    }
+
+    static int getTotalRequestedRowSize(Collection<RowBatch> batches) {
+        int totalSize = 0;
+
+        for (RowBatch batch : batches) {
+            totalSize += batch.requestedRows.size();
+        }
+
+        return totalSize;
+    }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 68dfedb73e..e218090950 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -1216,7 +1216,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                     : (insertFirst ? allRows : Set.of());
             Set<BinaryRow> res = new HashSet<>(roGetAll(allRows, clock.nowLong()));
 
-            assertEquals(expected.size(), res.size());
+            assertEquals(allRows.size(), res.size());
             for (BinaryRow e : expected) {
                 // TODO: IGNITE-19430 - should there be an assertion in the next line?
                 res.contains(e);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 6c1b529c6d..5aed2c8c28 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -17,16 +17,29 @@
 
 package org.apache.ignite.internal.table.distributed.storage;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.collectMultiRowsResponsesWithRestoreOrder;
+import static org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.collectMultiRowsResponsesWithoutRestoreOrder;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import java.util.List;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
@@ -81,4 +94,67 @@ public class InternalTableImplTest {
         verify(safeTime0).close();
         verify(storageIndex0).close();
     }
+
+    @Test
+    void testRowBatchByPartitionId() {
+        InternalTableImpl internalTable = new InternalTableImpl(
+                "test",
+                1,
+                Int2ObjectMaps.emptyMap(),
+                3,
+                s -> mock(ClusterNode.class),
+                mock(TxManager.class),
+                mock(MvTableStorage.class),
+                mock(TxStateTableStorage.class),
+                mock(ReplicaService.class),
+                mock(HybridClock.class)
+        );
+
+        List<BinaryRowEx> originalRows = List.of(
+                // Rows for 0 partition.
+                createBinaryRows(0),
+                createBinaryRows(0),
+                // Rows for 1 partition.
+                createBinaryRows(1),
+                // Rows for 2 partition.
+                createBinaryRows(2),
+                createBinaryRows(2),
+                createBinaryRows(2)
+        );
+
+        // We will get batches for processing and check them.
+        Int2ObjectMap<RowBatch> rowBatchByPartitionId = internalTable.toRowBatchByPartitionId(originalRows);
+
+        assertThat(rowBatchByPartitionId.get(0).requestedRows, hasSize(2));
+        assertThat(rowBatchByPartitionId.get(0).originalRowOrder, contains(0, 1));
+
+        assertThat(rowBatchByPartitionId.get(1).requestedRows, hasSize(1));
+        assertThat(rowBatchByPartitionId.get(1).originalRowOrder, contains(2));
+
+        assertThat(rowBatchByPartitionId.get(2).requestedRows, hasSize(3));
+        assertThat(rowBatchByPartitionId.get(2).originalRowOrder, contains(3, 4, 5));
+
+        // Collect the result and check it.
+        rowBatchByPartitionId.get(0).resultFuture = completedFuture(List.of(originalRows.get(0), originalRows.get(1)));
+        rowBatchByPartitionId.get(1).resultFuture = completedFuture(List.of(originalRows.get(2)));
+        rowBatchByPartitionId.get(2).resultFuture = completedFuture(List.of(originalRows.get(3), originalRows.get(4), originalRows.get(5)));
+
+        assertThat(
+                collectMultiRowsResponsesWithRestoreOrder(rowBatchByPartitionId.values()),
+                willBe(equalTo(originalRows))
+        );
+
+        assertThat(
+                collectMultiRowsResponsesWithoutRestoreOrder(rowBatchByPartitionId.values()),
+                willBe(hasItems(originalRows.toArray(BinaryRowEx[]::new)))
+        );
+    }
+
+    private static BinaryRowEx createBinaryRows(int colocationHash) {
+        BinaryRowEx rowEx = mock(BinaryRowEx.class);
+
+        when(rowEx.colocationHash()).thenReturn(colocationHash);
+
+        return rowEx;
+    }
 }