You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/06/23 11:36:43 UTC
[ignite-3] branch main updated: IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses (#2236)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 683f9a6f20 IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses (#2236)
683f9a6f20 is described below
commit 683f9a6f20613e727be56b08309b95e77147a9f4
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Fri Jun 23 14:36:37 2023 +0300
IGNITE-16004 Preserve key order in InternalTableImpl#collectMultiRowsResponses (#2236)
---
.../runner/app/ItTableApiContractTest.java | 28 +++++
.../ignite/internal/table/ItRoReadsTest.java | 4 +-
.../ItInternalTableReadOnlyOperationsTest.java | 56 +++++++--
.../replicator/PartitionReplicaListener.java | 4 +-
.../distributed/storage/InternalTableImpl.java | 127 +++++++++++++--------
.../table/distributed/storage/RowBatch.java | 77 +++++++++++++
.../replication/PartitionReplicaListenerTest.java | 2 +-
.../distributed/storage/InternalTableImplTest.java | 76 ++++++++++++
8 files changed, 315 insertions(+), 59 deletions(-)
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
index 05f86851f8..d123b28903 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
@@ -17,16 +17,21 @@
package org.apache.ignite.internal.runner.app;
+import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
import static org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter.convert;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
import org.apache.ignite.internal.schema.testutils.definition.ColumnType;
@@ -246,6 +251,29 @@ public class ItTableApiContractTest extends ClusterPerClassIntegrationTest {
assertThrows(TableAlreadyExistsException.class, () -> futureResult(tableFut2));
}
+ @Test
+ public void testGetAll() {
+ RecordView<Tuple> tbl = ignite.tables().table(TABLE_NAME).recordView();
+
+ var recs = IntStream.range(0, 5)
+ .mapToObj(i -> Tuple.create().set("name", "id_" + i * 2).set("balance", i * 2))
+ .collect(toList());
+
+ tbl.upsertAll(null, recs);
+
+ var keys = IntStream.range(0, 10)
+ .mapToObj(i -> Tuple.create().set("name", "id_" + i))
+ .collect(toList());
+
+ List<Tuple> res = (List<Tuple>) tbl.getAll(null, keys);
+
+ // TODO: IGNITE-19693 should be: "id_0", null, "id_2", null, "id_4", null, "id_6", null, "id_8", null
+ assertThat(
+ res.stream().map(tuple -> tuple.stringValue(0)).collect(toList()),
+ contains("id_0", "id_2", "id_4", "id_6", "id_8")
+ );
+ }
+
private TableManager tableManager() {
return (TableManager) ignite.tables();
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
index 7e2e74a5ae..01d4ab0183 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
@@ -327,7 +327,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
Collection<BinaryRow> res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get();
- assertEquals(res.size(), 0);
+ assertEquals(res.size(), 3);
node.transactions().runInTransaction(txs -> {
for (int i = 0; i < 15; i++) {
@@ -362,7 +362,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
Collection<BinaryRow> res = internalTable.getAll(rowsToSearch, node.clock().now(), node.node()).get();
- assertEquals(res.size(), 0);
+ assertEquals(res.size(), 3);
populateData(node(), keyValueView, false);
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
index 576e1f9a4b..482ab45518 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
@@ -17,10 +17,14 @@
package org.apache.ignite.distributed;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -30,6 +34,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -144,8 +149,36 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
public void testReadOnlyGetAllNonExistingKeysWithReadTimestamp() {
mockReadOnlyMultiRowRequest();
- assertEquals(0,
- internalTbl.getAll(Collections.singleton(createKeyRow(0)), CLOCK.now(), mock(ClusterNode.class)).join().size()
+ assertThat(
+ internalTbl.getAll(List.of(createKeyRow(0)), CLOCK.now(), mock(ClusterNode.class)),
+ willBe(contains(nullValue()))
+ );
+ }
+
+ @Test
+ public void testReadOnlyGetAllMixExistingKeysWithTx() {
+ mockReadOnlyMultiRowRequest();
+
+ assertThat(
+ internalTbl.getAll(
+ List.of(createKeyRow(0), createKeyRow(1), createKeyRow(2), createKeyRow(3)),
+ readOnlyTx
+ ),
+ willBe(contains(nullValue(), equalTo(ROW_1), equalTo(ROW_2), nullValue()))
+ );
+ }
+
+ @Test
+ public void testReadOnlyGetAllMixExistingKeysWithReadTimestamp() {
+ mockReadOnlyMultiRowRequest();
+
+ assertThat(
+ internalTbl.getAll(
+ List.of(createKeyRow(0), createKeyRow(1), createKeyRow(2), createKeyRow(3)),
+ CLOCK.now(),
+ mock(ClusterNode.class)
+ ),
+ willBe(contains(nullValue(), equalTo(ROW_1), equalTo(ROW_2), nullValue()))
);
}
@@ -153,8 +186,9 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
public void testReadOnlyGetAllNonExistingKeysWithTx() {
mockReadOnlyMultiRowRequest();
- assertEquals(0,
- internalTbl.getAll(Collections.singleton(createKeyRow(0)), readOnlyTx).join().size()
+ assertThat(
+ internalTbl.getAll(List.of(createKeyRow(0)), readOnlyTx),
+ willBe(contains(nullValue()))
);
}
@@ -268,16 +302,22 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
List<BinaryRow> rowStore = List.of(ROW_1, ROW_2);
when(replicaService.invoke(any(ClusterNode.class), any(ReadOnlyMultiRowReplicaRequest.class))).thenAnswer(args -> {
- List<BinaryRow> result = new ArrayList<>();
+ Collection<BinaryRow> requestedRows = args.getArgument(1, ReadOnlyMultiRowReplicaRequest.class).binaryRows();
- for (BinaryRow row : rowStore) {
- for (BinaryRow searchRow : args.getArgument(1, ReadOnlyMultiRowReplicaRequest.class).binaryRows()) {
+ List<BinaryRow> result = new ArrayList<>(requestedRows.size());
+
+ for (BinaryRow searchRow : requestedRows) {
+ BinaryRow resultRow = null;
+
+ for (BinaryRow row : rowStore) {
if (KEY_EXTRACTOR.apply(row).byteBuffer().equals(KEY_EXTRACTOR.apply(searchRow).byteBuffer())) {
- result.add(row);
+ resultRow = row;
break;
}
}
+
+ result.add(resultRow);
}
return CompletableFuture.completedFuture(result);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index b19116c190..367b9aded2 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -575,9 +575,7 @@ public class PartitionReplicaListener implements ReplicaListener {
for (CompletableFuture<BinaryRow> resolutionFut : resolutionFuts) {
BinaryRow resolvedReadResult = resolutionFut.join();
- if (resolvedReadResult != null) {
- result.add(resolvedReadResult);
- }
+ result.add(resolvedReadResult);
}
return result;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 36c68c3b93..06724d110b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed.storage;
import static it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.table.distributed.storage.RowBatch.allResultFutures;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
@@ -33,6 +34,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.net.ConnectException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
@@ -283,7 +285,7 @@ public class InternalTableImpl implements InternalTable {
Collection<BinaryRowEx> keyRows,
@Nullable InternalTransaction tx,
IgniteFiveFunction<TablePartitionId, Collection<BinaryRow>, InternalTransaction, ReplicationGroupId, Long, ReplicaRequest> op,
- Function<CompletableFuture<Object>[], CompletableFuture<T>> reducer
+ Function<Collection<RowBatch>, CompletableFuture<T>> reducer
) {
// Check whether proposed tx is read-only. Complete future exceptionally if true.
// Attempting to enlist a read-only in a read-write transaction does not corrupt the transaction itself, thus read-write transaction
@@ -306,14 +308,13 @@ public class InternalTableImpl implements InternalTable {
final InternalTransaction tx0 = implicit ? txManager.begin() : tx;
- Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = mapRowsToPartitions(keyRows);
+ Int2ObjectMap<RowBatch> rowBatchByPartitionId = toRowBatchByPartitionId(keyRows);
- CompletableFuture<Object>[] futures = new CompletableFuture[keyRowsByPartition.size()];
+ for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+ int partitionId = partitionRowBatch.getIntKey();
+ RowBatch rowBatch = partitionRowBatch.getValue();
- int batchNum = 0;
-
- for (Int2ObjectOpenHashMap.Entry<List<BinaryRow>> partToRows : keyRowsByPartition.int2ObjectEntrySet()) {
- TablePartitionId partGroupId = new TablePartitionId(tableId, partToRows.getIntKey());
+ TablePartitionId partGroupId = new TablePartitionId(tableId, partitionId);
IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm = tx0.enlistedNodeAndTerm(partGroupId);
@@ -322,7 +323,7 @@ public class InternalTableImpl implements InternalTable {
if (primaryReplicaAndTerm != null) {
TablePartitionId commitPart = tx.commitPartition();
- ReplicaRequest request = op.apply(commitPart, partToRows.getValue(), tx0, partGroupId, primaryReplicaAndTerm.get2());
+ ReplicaRequest request = op.apply(commitPart, rowBatch.requestedRows, tx0, partGroupId, primaryReplicaAndTerm.get2());
try {
fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(), request);
@@ -334,16 +335,16 @@ public class InternalTableImpl implements InternalTable {
} else {
fut = enlistWithRetry(
tx0,
- partToRows.getIntKey(),
- (commitPart, term) -> op.apply(commitPart, partToRows.getValue(), tx0, partGroupId, term),
+ partitionId,
+ (commitPart, term) -> op.apply(commitPart, rowBatch.requestedRows, tx0, partGroupId, term),
ATTEMPTS_TO_ENLIST_PARTITION
);
}
- futures[batchNum++] = fut;
+ rowBatch.resultFuture = fut;
}
- CompletableFuture<T> fut = reducer.apply(futures);
+ CompletableFuture<T> fut = reducer.apply(rowBatchByPartitionId.values());
return postEnlist(fut, implicit, tx0);
}
@@ -581,7 +582,8 @@ public class InternalTableImpl implements InternalTable {
.requestType(RequestType.RW_GET_ALL)
.timestampLong(clock.nowLong())
.build(),
- this::collectMultiRowsResponses);
+ InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder
+ );
}
}
@@ -592,27 +594,23 @@ public class InternalTableImpl implements InternalTable {
HybridTimestamp readTimestamp,
ClusterNode recipientNode
) {
- Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = mapRowsToPartitions(keyRows);
-
- CompletableFuture<Object>[] futures = new CompletableFuture[keyRowsByPartition.size()];
-
- int batchNum = 0;
+ Int2ObjectMap<RowBatch> rowBatchByPartitionId = toRowBatchByPartitionId(keyRows);
- for (Int2ObjectOpenHashMap.Entry<List<BinaryRow>> partToRows : keyRowsByPartition.int2ObjectEntrySet()) {
- ReplicationGroupId partGroupId = raftGroupServiceByPartitionId.get(partToRows.getIntKey()).groupId();
+ for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
+ ReplicationGroupId partGroupId = raftGroupServiceByPartitionId.get(partitionRowBatch.getIntKey()).groupId();
CompletableFuture<Object> fut = replicaSvc.invoke(recipientNode, tableMessagesFactory.readOnlyMultiRowReplicaRequest()
.groupId(partGroupId)
- .binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
+ .binaryRowsBytes(serializeBinaryRows(partitionRowBatch.getValue().requestedRows))
.requestType(RequestType.RO_GET_ALL)
.readTimestampLong(readTimestamp.longValue())
.build()
);
- futures[batchNum++] = fut;
+ partitionRowBatch.getValue().resultFuture = fut;
}
- return collectMultiRowsResponses(futures);
+ return collectMultiRowsResponsesWithRestoreOrder(rowBatchByPartitionId.values());
}
private static List<ByteBuffer> serializeBinaryRows(Collection<? extends BinaryRow> rows) {
@@ -649,7 +647,8 @@ public class InternalTableImpl implements InternalTable {
rows,
tx,
this::upsertAllInternal,
- CompletableFuture::allOf);
+ RowBatch::allResultFutures
+ );
}
/** {@inheritDoc} */
@@ -719,7 +718,8 @@ public class InternalTableImpl implements InternalTable {
.requestType(RequestType.RW_INSERT_ALL)
.timestampLong(clock.nowLong())
.build(),
- this::collectMultiRowsResponses);
+ InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder
+ );
}
/** {@inheritDoc} */
@@ -846,7 +846,8 @@ public class InternalTableImpl implements InternalTable {
.requestType(RequestType.RW_DELETE_ALL)
.timestampLong(clock.nowLong())
.build(),
- this::collectMultiRowsResponses);
+ InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder
+ );
}
/** {@inheritDoc} */
@@ -867,7 +868,8 @@ public class InternalTableImpl implements InternalTable {
.requestType(RequestType.RW_DELETE_EXACT_ALL)
.timestampLong(clock.nowLong())
.build(),
- this::collectMultiRowsResponses);
+ InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder
+ );
}
@Override
@@ -1077,19 +1079,20 @@ public class InternalTableImpl implements InternalTable {
}
/**
- * Map rows to partitions.
+ * Creates batches of rows for processing, grouped by partition ID.
*
* @param rows Rows.
- * @return Partition -%gt; rows mapping.
*/
- private Int2ObjectOpenHashMap<List<BinaryRow>> mapRowsToPartitions(Collection<BinaryRowEx> rows) {
- Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = new Int2ObjectOpenHashMap<>();
+ Int2ObjectMap<RowBatch> toRowBatchByPartitionId(Collection<BinaryRowEx> rows) {
+ Int2ObjectMap<RowBatch> rowBatchByPartitionId = new Int2ObjectOpenHashMap<>();
+
+ int i = 0;
- for (BinaryRowEx keyRow : rows) {
- keyRowsByPartition.computeIfAbsent(partitionId(keyRow), k -> new ArrayList<>()).add(keyRow);
+ for (BinaryRowEx row : rows) {
+ rowBatchByPartitionId.computeIfAbsent(partitionId(row), partitionId -> new RowBatch()).add(row, i++);
}
- return keyRowsByPartition;
+ return rowBatchByPartitionId;
}
/** {@inheritDoc} */
@@ -1206,25 +1209,59 @@ public class InternalTableImpl implements InternalTable {
}
/**
- * TODO asch keep the same order as for keys Collects multirow responses from multiple futures into a single collection IGNITE-16004.
+ * Gathers the result of batch processing into a single resulting collection of rows.
*
- * @param futs Futures.
- * @return Row collection.
+ * @param rowBatches Row batches.
+ * @return Future of collecting results.
*/
- private CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponses(CompletableFuture<Object>[] futs) {
- return CompletableFuture.allOf(futs)
+ static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithoutRestoreOrder(Collection<RowBatch> rowBatches) {
+ return allResultFutures(rowBatches)
.thenApply(response -> {
- Collection<BinaryRow> list = new ArrayList<>(futs.length);
+ var result = new ArrayList<BinaryRow>(rowBatches.size());
+
+ for (RowBatch rowBatch : rowBatches) {
+ Collection<BinaryRow> batchResult = (Collection<BinaryRow>) rowBatch.getCompletedResult();
+
+ if (batchResult == null) {
+ continue;
+ }
+
+ result.addAll(batchResult);
+ }
+
+ return result;
+ });
+ }
+
+ /**
+ * Gathers the result of batch processing into a single resulting collection of rows, restoring order as in the requested collection of
+ * rows.
+ *
+ * @param rowBatches Row batches by partition ID.
+ * @return Future of collecting results.
+ */
+ static CompletableFuture<Collection<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(Collection<RowBatch> rowBatches) {
+ return allResultFutures(rowBatches)
+ .thenApply(response -> {
+ var result = new BinaryRow[RowBatch.getTotalRequestedRowSize(rowBatches)];
+
+ for (RowBatch rowBatch : rowBatches) {
+ Collection<BinaryRow> batchResult = (Collection<BinaryRow>) rowBatch.getCompletedResult();
+
+ assert batchResult != null;
+
+ assert batchResult.size() == rowBatch.requestedRows.size() :
+ "batchResult=" + batchResult.size() + ", requestedRows=" + rowBatch.requestedRows.size();
- for (CompletableFuture<Object> future : futs) {
- Collection<BinaryRow> values = (Collection<BinaryRow>) future.join();
+ int i = 0;
- if (values != null) {
- list.addAll(values);
+ for (BinaryRow resultRow : batchResult) {
+ result[rowBatch.getOriginalRowIndex(i++)] = resultRow;
}
}
- return list;
+ // Use Arrays#asList to avoid copying the array.
+ return Arrays.asList(result);
});
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/RowBatch.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/RowBatch.java
new file mode 100644
index 0000000000..09214d860a
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/RowBatch.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.storage;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Batch of binary rows from the original collection of binary rows for processing, preserving the order from the original collection.
+ *
+ * <p>NOTE: Not thread-safe.
+ */
+class RowBatch {
+ /** Batch of rows from the original collection of rows. */
+ final List<BinaryRow> requestedRows = new ArrayList<>();
+
+ /** Order of the rows from the {@link #requestedRows} in the original row collection. */
+ final IntList originalRowOrder = new IntArrayList();
+
+ /**
+ * Future of the result of processing the {@link #requestedRows}, {@code null} if not set and may return {@code null}.
+ */
+ @Nullable CompletableFuture<Object> resultFuture;
+
+ void add(BinaryRow row, int originalIndex) {
+ requestedRows.add(row);
+ originalRowOrder.add(originalIndex);
+ }
+
+ @Nullable Object getCompletedResult() {
+ CompletableFuture<Object> resultFuture = this.resultFuture;
+
+ assert resultFuture != null;
+ assert resultFuture.isDone();
+
+ return resultFuture.join();
+ }
+
+ int getOriginalRowIndex(int resultRowIndex) {
+ return originalRowOrder.getInt(resultRowIndex);
+ }
+
+ static CompletableFuture<Void> allResultFutures(Collection<RowBatch> batches) {
+ return CompletableFuture.allOf(batches.stream().map(rowBatch -> rowBatch.resultFuture).toArray(CompletableFuture[]::new));
+ }
+
+ static int getTotalRequestedRowSize(Collection<RowBatch> batches) {
+ int totalSize = 0;
+
+ for (RowBatch batch : batches) {
+ totalSize += batch.requestedRows.size();
+ }
+
+ return totalSize;
+ }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 68dfedb73e..e218090950 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -1216,7 +1216,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
: (insertFirst ? allRows : Set.of());
Set<BinaryRow> res = new HashSet<>(roGetAll(allRows, clock.nowLong()));
- assertEquals(expected.size(), res.size());
+ assertEquals(allRows.size(), res.size());
for (BinaryRow e : expected) {
// TODO: IGNITE-19430 - should there be an assertion in the next line?
res.contains(e);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 6c1b529c6d..5aed2c8c28 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -17,16 +17,29 @@
package org.apache.ignite.internal.table.distributed.storage;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.collectMultiRowsResponsesWithRestoreOrder;
+import static org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.collectMultiRowsResponsesWithoutRestoreOrder;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import java.util.List;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
@@ -81,4 +94,67 @@ public class InternalTableImplTest {
verify(safeTime0).close();
verify(storageIndex0).close();
}
+
+ @Test
+ void testRowBatchByPartitionId() {
+ InternalTableImpl internalTable = new InternalTableImpl(
+ "test",
+ 1,
+ Int2ObjectMaps.emptyMap(),
+ 3,
+ s -> mock(ClusterNode.class),
+ mock(TxManager.class),
+ mock(MvTableStorage.class),
+ mock(TxStateTableStorage.class),
+ mock(ReplicaService.class),
+ mock(HybridClock.class)
+ );
+
+ List<BinaryRowEx> originalRows = List.of(
+ // Rows for 0 partition.
+ createBinaryRows(0),
+ createBinaryRows(0),
+ // Rows for 1 partition.
+ createBinaryRows(1),
+ // Rows for 2 partition.
+ createBinaryRows(2),
+ createBinaryRows(2),
+ createBinaryRows(2)
+ );
+
+ // We will get batches for processing and check them.
+ Int2ObjectMap<RowBatch> rowBatchByPartitionId = internalTable.toRowBatchByPartitionId(originalRows);
+
+ assertThat(rowBatchByPartitionId.get(0).requestedRows, hasSize(2));
+ assertThat(rowBatchByPartitionId.get(0).originalRowOrder, contains(0, 1));
+
+ assertThat(rowBatchByPartitionId.get(1).requestedRows, hasSize(1));
+ assertThat(rowBatchByPartitionId.get(1).originalRowOrder, contains(2));
+
+ assertThat(rowBatchByPartitionId.get(2).requestedRows, hasSize(3));
+ assertThat(rowBatchByPartitionId.get(2).originalRowOrder, contains(3, 4, 5));
+
+ // Collect the result and check it.
+ rowBatchByPartitionId.get(0).resultFuture = completedFuture(List.of(originalRows.get(0), originalRows.get(1)));
+ rowBatchByPartitionId.get(1).resultFuture = completedFuture(List.of(originalRows.get(2)));
+ rowBatchByPartitionId.get(2).resultFuture = completedFuture(List.of(originalRows.get(3), originalRows.get(4), originalRows.get(5)));
+
+ assertThat(
+ collectMultiRowsResponsesWithRestoreOrder(rowBatchByPartitionId.values()),
+ willBe(equalTo(originalRows))
+ );
+
+ assertThat(
+ collectMultiRowsResponsesWithoutRestoreOrder(rowBatchByPartitionId.values()),
+ willBe(hasItems(originalRows.toArray(BinaryRowEx[]::new)))
+ );
+ }
+
+ private static BinaryRowEx createBinaryRows(int colocationHash) {
+ BinaryRowEx rowEx = mock(BinaryRowEx.class);
+
+ when(rowEx.colocationHash()).thenReturn(colocationHash);
+
+ return rowEx;
+ }
}