You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2022/10/11 13:28:30 UTC
[ignite-3] branch main updated: IGNITE-17259 Populate ReplicaListener with handlers for RO requests (#1165)
This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e75022ec99 IGNITE-17259 Populate ReplicaListener with handlers for RO requests (#1165)
e75022ec99 is described below
commit e75022ec99689a08a042e2ecedec6e4211dbfecf
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Tue Oct 11 16:28:25 2022 +0300
IGNITE-17259 Populate ReplicaListener with handlers for RO requests (#1165)
---
.../internal/table/distributed/TableManager.java | 34 +++----
.../table/distributed/TableMessageGroup.java | 26 +++++-
...st.java => ReadOnlyMultiRowReplicaRequest.java} | 25 ++----
...> ReadOnlyScanRetrieveBatchReplicaRequest.java} | 23 +----
...t.java => ReadOnlySingleRowReplicaRequest.java} | 25 ++----
.../request/ScanRetrieveBatchReplicaRequest.java | 56 ++++++++++--
.../replicator/PartitionReplicaListener.java | 100 ++++++++++++++++++++-
7 files changed, 200 insertions(+), 89 deletions(-)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 78cdc720e5..9d10eb31bb 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1668,15 +1668,15 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return true;
}
- int part = extractPartitionNumber(pendingAssignmentsWatchEvent.key());
+ int partId = extractPartitionNumber(pendingAssignmentsWatchEvent.key());
UUID tblId = extractTableId(pendingAssignmentsWatchEvent.key(), PENDING_ASSIGNMENTS_PREFIX);
- String partId = partitionRaftGroupName(tblId, part);
+ String grpId = partitionRaftGroupName(tblId, partId);
// Assignments of the pending rebalance that we received through the meta storage watch mechanism.
Set<ClusterNode> newPeers = ByteUtils.fromBytes(pendingAssignmentsWatchEvent.value());
- var pendingAssignments = metaStorageMgr.get(pendingPartAssignmentsKey(partId)).join();
+ var pendingAssignments = metaStorageMgr.get(pendingPartAssignmentsKey(grpId)).join();
assert pendingAssignmentsWatchEvent.revision() <= pendingAssignments.revision()
: "Meta Storage watch cannot notify about an event with the revision that is more than the actual revision.";
@@ -1691,12 +1691,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
+ tbl.internalTable().storage().getClass().getName();
// Stable assignments from the meta store, which revision is bounded by the current pending event.
- byte[] stableAssignments = metaStorageMgr.get(stablePartAssignmentsKey(partId),
+ byte[] stableAssignments = metaStorageMgr.get(stablePartAssignmentsKey(grpId),
pendingAssignmentsWatchEvent.revision()).join().value();
Set<ClusterNode> assignments = stableAssignments == null
// This is for the case when the first rebalance occurs.
- ? ((List<Set<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(part)
+ ? ((List<Set<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(partId)
: ByteUtils.fromBytes(stableAssignments);
ClusterNode localMember = raftMgr.topologyService().localMember();
@@ -1710,10 +1710,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
try {
LOG.info("Received update on pending assignments. Check if new raft group should be started"
+ " [key={}, partition={}, table={}, localMemberAddress={}]",
- pendingAssignmentsWatchEvent.key(), part, tbl.name(), localMember.address());
+ pendingAssignmentsWatchEvent.key(), partId, tbl.name(), localMember.address());
if (raftMgr.shouldHaveRaftGroupLocally(deltaPeers)) {
- MvPartitionStorage partitionStorage = tbl.internalTable().storage().getOrCreateMvPartition(part);
+ MvPartitionStorage partitionStorage = tbl.internalTable().storage().getOrCreateMvPartition(partId);
RaftGroupOptions groupOptions = groupOptionsForPartition(
tbl.internalTable(),
@@ -1724,7 +1724,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
RaftGroupListener raftGrpLsnr = new PartitionListener(
partitionStorage,
- tbl.internalTable().txStateStorage().getOrCreateTxStateStorage(part),
+ tbl.internalTable().txStateStorage().getOrCreateTxStateStorage(partId),
txManager,
primaryIndex
);
@@ -1732,16 +1732,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
RaftGroupEventsListener raftGrpEvtsLsnr = new RebalanceRaftGroupEventsListener(
metaStorageMgr,
tblCfg,
+ grpId,
partId,
- part,
busyLock,
- movePartition(() -> tbl.internalTable().partitionRaftGroupService(part)),
+ movePartition(() -> tbl.internalTable().partitionRaftGroupService(partId)),
TableManager.this::calculateAssignments,
rebalanceScheduler
);
raftMgr.startRaftGroupNode(
- partId,
+ grpId,
assignments,
raftGrpLsnr,
raftGrpEvtsLsnr,
@@ -1750,16 +1750,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
if (replicaMgr.shouldHaveReplicationGroupLocally(deltaPeers)) {
- MvPartitionStorage partitionStorage = tbl.internalTable().storage().getOrCreateMvPartition(part);
+ MvPartitionStorage partitionStorage = tbl.internalTable().storage().getOrCreateMvPartition(partId);
- replicaMgr.startReplica(partId,
+ replicaMgr.startReplica(grpId,
new PartitionReplicaListener(
partitionStorage,
- tbl.internalTable().partitionRaftGroupService(part),
+ tbl.internalTable().partitionRaftGroupService(partId),
txManager,
lockMgr,
- part,
partId,
+ grpId,
tblId,
primaryIndex,
clock
@@ -1778,7 +1778,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
var newNodes = newPeers.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
- RaftGroupService partGrpSvc = tbl.internalTable().partitionRaftGroupService(part);
+ RaftGroupService partGrpSvc = tbl.internalTable().partitionRaftGroupService(partId);
IgniteBiTuple<Peer, Long> leaderWithTerm = partGrpSvc.refreshAndGetLeaderWithTerm().join();
@@ -1786,7 +1786,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
if (localMember.address().equals(leaderWithTerm.get1().address())) {
LOG.info("Current node={} is the leader of partition raft group={}. "
+ "Initiate rebalance process for partition={}, table={}",
- localMember.address(), partId, part, tbl.name());
+ localMember.address(), grpId, partId, tbl.name());
partGrpSvc.changePeersAsync(newNodes, leaderWithTerm.get2()).join();
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index f89f6fb6fd..3eb11ed25f 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -26,11 +26,14 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanCloseReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSwapRowReplicaRequest;
-import org.apache.ignite.internal.table.distributed.replication.request.ScanCloseReplicaRequest;
-import org.apache.ignite.internal.table.distributed.replication.request.ScanRetrieveBatchReplicaRequest;
import org.apache.ignite.network.annotations.MessageGroup;
/**
@@ -54,12 +57,12 @@ public interface TableMessageGroup {
short RW_DUAL_ROW_REPLICA_REQUEST = 2;
/**
- * Message type for {@link ScanRetrieveBatchReplicaRequest}.
+ * Message type for {@link ReadWriteScanRetrieveBatchReplicaRequest}.
*/
short RW_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST = 3;
/**
- * Message type for {@link ScanCloseReplicaRequest}.
+ * Message type for {@link ReadWriteScanCloseReplicaRequest}.
*/
short RW_SCAN_CLOSE_REPLICA_REQUEST = 4;
@@ -73,6 +76,21 @@ public interface TableMessageGroup {
*/
short HAS_DATA_RESPONSE = 6;
+ /**
+ * Message type for {@link ReadOnlySingleRowReplicaRequest}.
+ */
+ short RO_SINGLE_ROW_REPLICA_REQUEST = 7;
+
+ /**
+ * Message type for {@link ReadOnlyMultiRowReplicaRequest}.
+ */
+ short RO_MULTI_ROW_REPLICA_REQUEST = 8;
+
+ /**
+ * Message type for {@link ReadOnlyScanRetrieveBatchReplicaRequest}.
+ */
+ short RO_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST = 9;
+
/**
* Message type for {@link SnapshotMetaRequest}.
*/
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyMultiRowReplicaRequest.java
similarity index 54%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyMultiRowReplicaRequest.java
index 4c147793af..ac377e98bf 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyMultiRowReplicaRequest.java
@@ -17,27 +17,12 @@
package org.apache.ignite.internal.table.distributed.replication.request;
-import java.util.function.Predicate;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
/**
- * Scan retrieve batch replica request.
+ * Read only multi row replica request.
*/
-public interface ScanRetrieveBatchReplicaRequest extends ReplicaRequest {
- /** Batch size. */
- int batchSize();
-
- /** The id uniquely determines a cursor for the transaction. */
- long scanId();
-
- /**
- * Gets a scan row filter. The filter has a sense only for the first request, for the second one and the followings the field is
- * ignored.
- *
- * @return Row filter predicate.
- */
- @Marshallable
- Predicate<BinaryRow> rowFilter();
+@Transferable(TableMessageGroup.RO_MULTI_ROW_REPLICA_REQUEST)
+public interface ReadOnlyMultiRowReplicaRequest extends MultipleRowReplicaRequest, ReadOnlyReplicaRequest {
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyScanRetrieveBatchReplicaRequest.java
similarity index 57%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyScanRetrieveBatchReplicaRequest.java
index 4c147793af..37c1d37239 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyScanRetrieveBatchReplicaRequest.java
@@ -17,27 +17,12 @@
package org.apache.ignite.internal.table.distributed.replication.request;
-import java.util.function.Predicate;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
/**
* Scan retrieve batch replica request.
*/
-public interface ScanRetrieveBatchReplicaRequest extends ReplicaRequest {
- /** Batch size. */
- int batchSize();
-
- /** The id uniquely determines a cursor for the transaction. */
- long scanId();
-
- /**
- * Gets a scan row filter. The filter has a sense only for the first request, for the second one and the followings the field is
- * ignored.
- *
- * @return Row filter predicate.
- */
- @Marshallable
- Predicate<BinaryRow> rowFilter();
+@Transferable(TableMessageGroup.RO_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST)
+public interface ReadOnlyScanRetrieveBatchReplicaRequest extends ScanRetrieveBatchReplicaRequest, ReadOnlyReplicaRequest {
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlySingleRowReplicaRequest.java
similarity index 54%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlySingleRowReplicaRequest.java
index 4c147793af..6589630d9d 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlySingleRowReplicaRequest.java
@@ -17,27 +17,12 @@
package org.apache.ignite.internal.table.distributed.replication.request;
-import java.util.function.Predicate;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
/**
- * Scan retrieve batch replica request.
+ * Read only single row replica request.
*/
-public interface ScanRetrieveBatchReplicaRequest extends ReplicaRequest {
- /** Batch size. */
- int batchSize();
-
- /** The id uniquely determines a cursor for the transaction. */
- long scanId();
-
- /**
- * Gets a scan row filter. The filter has a sense only for the first request, for the second one and the followings the field is
- * ignored.
- *
- * @return Row filter predicate.
- */
- @Marshallable
- Predicate<BinaryRow> rowFilter();
+@Transferable(TableMessageGroup.RO_SINGLE_ROW_REPLICA_REQUEST)
+public interface ReadOnlySingleRowReplicaRequest extends SingleRowReplicaRequest, ReadOnlyReplicaRequest {
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
index 4c147793af..70d53c159d 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.table.distributed.replication.request;
-import java.util.function.Predicate;
+import java.util.BitSet;
+import java.util.UUID;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.network.annotations.Marshallable;
/**
@@ -33,11 +35,53 @@ public interface ScanRetrieveBatchReplicaRequest extends ReplicaRequest {
long scanId();
/**
- * Gets a scan row filter. The filter has a sense only for the first request, for the second one and the followings the field is
- * ignored.
+ * Gets an index to use fot the retrieve request.
*
- * @return Row filter predicate.
+ * @return Index id.
*/
@Marshallable
- Predicate<BinaryRow> rowFilter();
+ UUID indexToUse();
+
+ /**
+ * Gets a key which is used for exact comparison in the index.
+ *
+ * @return Key to search.
+ */
+ @Marshallable
+ BinaryTuple exactKey();
+
+ /**
+ * Gets a lower bound to choose entries from {@link SortedIndexStorage}. Exclusivity is controlled by a {@link
+ * SortedIndexStorage#GREATER_OR_EQUAL} or {@link SortedIndexStorage#GREATER} flag. {@code null} means unbounded.
+ *
+ * @return lower bound.
+ */
+ @Marshallable
+ BinaryTuple lowerBound();
+
+ /**
+ * Gets an upper bound to choose entries from {@link SortedIndexStorage}. Upper bound. Exclusivity is controlled by a {@link
+ * SortedIndexStorage#LESS} or {@link SortedIndexStorage#LESS_OR_EQUAL} flag. {@code null} means unbounded.
+ *
+ * @return upper bound.
+ */
+ @Marshallable
+ BinaryTuple upperBound();
+
+ /**
+ * Gets control flags for {@link SortedIndexStorage}. {@link SortedIndexStorage#GREATER} | {@link SortedIndexStorage#LESS} by default.
+ * Other available values are {@link SortedIndexStorage#GREATER_OR_EQUAL}, {@link SortedIndexStorage#LESS_OR_EQUAL}.
+ * TODO: IGNITE-17748 Refresh the summary when the meaning changes, after the issue will be implemented.
+ *
+ * @return Flags to determine a scan order.
+ */
+ int flags();
+
+ /**
+ * Gets bitset to include columns.
+ *
+ * @return Bitset to include columns.
+ */
+ @Marshallable
+ BitSet columnsToInclude();
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 1aa567cec5..cf429c2bfd 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -51,12 +51,16 @@ import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanCloseReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSwapRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.LockKey;
import org.apache.ignite.internal.tx.LockManager;
@@ -184,12 +188,100 @@ public class PartitionReplicaListener implements ReplicaListener {
return processTxFinishAction((TxFinishReplicaRequest) request);
} else if (request instanceof TxCleanupReplicaRequest) {
return processTxCleanupAction((TxCleanupReplicaRequest) request);
+ } else if (request instanceof ReadOnlySingleRowReplicaRequest) {
+ return processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request);
+ } else if (request instanceof ReadOnlyMultiRowReplicaRequest) {
+ return processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request);
+ } else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) {
+ return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request);
} else {
throw new UnsupportedReplicaRequestException(request.getClass());
}
});
}
+ /**
+ * Processes retrieve batch for read only transaction.
+ *
+ * @param request Read only retrieve batch request.
+ * @return Result future.
+ */
+ private CompletableFuture<Object> processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest request) {
+ UUID txId = request.transactionId();
+ int batchCount = request.batchSize();
+
+ IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+ ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
+
+ //TODO: IGNITE-17849 Remove this always true filter after the storage API will be changed.
+ PartitionTimestampCursor cursor = cursors.computeIfAbsent(cursorId,
+ id -> mvDataStorage.scan(row -> true, HybridTimestamp.MAX_VALUE));
+
+ while (batchRows.size() < batchCount && cursor.hasNext()) {
+ BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), null);
+
+ if (resolvedReadResult != null) {
+ batchRows.add(resolvedReadResult);
+ }
+ }
+
+ return CompletableFuture.completedFuture(batchRows);
+ }
+
+ /**
+ * Processes single entry request for read only transaction.
+ *
+ * @param request Read only single entry request.
+ * @return Result future.
+ */
+ private CompletableFuture<Object> processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) {
+ ByteBuffer searchKey = request.binaryRow().keySlice();
+
+ UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+ if (request.requestType() != RequestType.RO_GET) {
+ throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+ IgniteStringFormatter.format("Unknown single request [actionType={}]", request.requestType()));
+ }
+
+ //TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
+ RowId rowId = rowIdByKey(indexId, searchKey);
+
+ BinaryRow result = rowId != null ? resolveReadResult(mvDataStorage.read(rowId, request.timestamp()), null) : null;
+
+ return CompletableFuture.completedFuture(result);
+ }
+
+ /**
+ * Processes multiple entries request for read only transaction.
+ *
+ * @param request Read only multiple entries request.
+ * @return Result future.
+ */
+ private CompletableFuture<Object> processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request) {
+ Collection<ByteBuffer> keyRows = request.binaryRows().stream().map(br -> br.keySlice()).collect(
+ Collectors.toList());
+
+ UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+ if (request.requestType() != RequestType.RO_GET_ALL) {
+ throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+ IgniteStringFormatter.format("Unknown single request [actionType={}]", request.requestType()));
+ }
+
+ ArrayList<BinaryRow> result = new ArrayList<>(keyRows.size());
+
+ for (ByteBuffer searchKey : keyRows) {
+ //TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
+ RowId rowId = rowIdByKey(indexId, searchKey);
+
+ result.add(rowId != null ? resolveReadResult(mvDataStorage.read(rowId, request.timestamp()), null) : null);
+ }
+
+ return CompletableFuture.completedFuture(result);
+ }
+
/**
* Close all cursors connected with a transaction.
*
@@ -264,11 +356,13 @@ public class PartitionReplicaListener implements ReplicaListener {
return lockManager.acquire(txId, new LockKey(tableId), LockMode.S).thenCompose(tblLock -> {
ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
- PartitionTimestampCursor cursor = cursors.computeIfAbsent(cursorId, id -> mvDataStorage.scan(request.rowFilter(),
- HybridTimestamp.MAX_VALUE));
+ //TODO: IGNITE-17849 Remove this always true filter after the storage API will be changed.
+ PartitionTimestampCursor cursor = cursors.computeIfAbsent(cursorId,
+ id -> mvDataStorage.scan(row -> true, HybridTimestamp.MAX_VALUE));
- for (int i = 0; i < batchCount && cursor.hasNext(); i++) {
+ while (batchRows.size() < batchCount && cursor.hasNext()) {
BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), txId);
+
if (resolvedReadResult != null) {
batchRows.add(resolvedReadResult);
}