You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/10/13 14:41:57 UTC
[ignite-3] branch main updated: IGNITE-17849 Remove filter from partition fullscan (#1203)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b8a34e377e IGNITE-17849 Remove filter from partition fullscan (#1203)
b8a34e377e is described below
commit b8a34e377e5f3bd5ee5e61901a3bcdebe60d7198
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Thu Oct 13 18:41:51 2022 +0400
IGNITE-17849 Remove filter from partition fullscan (#1203)
---
.../internal/storage/MvPartitionStorage.java | 4 +-
.../storage/AbstractMvPartitionStorageTest.java | 27 +++++-----
.../storage/AbstractMvTableStorageTest.java | 4 +-
.../storage/impl/TestMvPartitionStorage.java | 41 ++++------------
.../mv/AbstractPageMemoryMvPartitionStorage.java | 57 +++++++---------------
.../AbstractPageMemoryMvPartitionStorageTest.java | 4 +-
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 6 +--
.../distributed/ItInternalTableScanTest.java | 8 +--
.../replicator/PartitionReplicaListener.java | 10 ++--
9 files changed, 53 insertions(+), 108 deletions(-)
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 2abd01450a..8e742f54d3 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.storage;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
-import java.util.function.Predicate;
import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.util.Cursor;
@@ -173,13 +172,12 @@ public interface MvPartitionStorage extends AutoCloseable {
/**
* Scans the partition and returns a cursor of values at the given timestamp.
*
- * @param keyFilter Key filter. Binary rows passed to the filter may or may not have a value, filter should only check keys.
* @param timestamp Timestamp. Can't be {@code null}.
* @return Cursor.
* @throws TxIdMismatchException If there's another pending update associated with different transaction id.
* @throws StorageException If failed to read data from the storage.
*/
- PartitionTimestampCursor scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException;
+ PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException;
/**
* Returns a row id, existing in the storage, that's greater or equal than the lower bound. {@code null} if not found.
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index dccd12b700..09e1b2cc3a 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -40,7 +40,6 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.ignite.hlc.HybridClock;
import org.apache.ignite.hlc.HybridTimestamp;
@@ -85,8 +84,8 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
/**
* Scans partition.
*/
- protected PartitionTimestampCursor scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
- return storage.scan(filter, timestamp);
+ protected PartitionTimestampCursor scan(HybridTimestamp timestamp) {
+ return storage.scan(timestamp);
}
/**
@@ -158,7 +157,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
@Test
public void testScanOverEmpty() throws Exception {
- assertEquals(List.of(), convert(scan(row -> true, clock.now())));
+ assertEquals(List.of(), convert(scan(clock.now())));
}
/**
@@ -269,7 +268,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
}
/**
- * Tests basic invariants of {@link MvPartitionStorage#scan(Predicate, HybridTimestamp)}.
+ * Tests basic invariants of {@link MvPartitionStorage#scan(HybridTimestamp)}.
*/
@Test
public void testScan() throws Exception {
@@ -296,13 +295,13 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
HybridTimestamp ts5 = clock.now();
// Full scan with various timestamp values.
- assertEquals(List.of(), convert(scan(row -> true, ts1)));
+ assertEquals(List.of(), convert(scan(ts1)));
- assertEquals(List.of(value1), convert(scan(row -> true, ts2)));
- assertEquals(List.of(value1), convert(scan(row -> true, ts3)));
+ assertEquals(List.of(value1), convert(scan(ts2)));
+ assertEquals(List.of(value1), convert(scan(ts3)));
- assertEquals(List.of(value1, value2), convert(scan(row -> true, ts4)));
- assertEquals(List.of(value1, value2), convert(scan(row -> true, ts5)));
+ assertEquals(List.of(value1, value2), convert(scan(ts4)));
+ assertEquals(List.of(value1, value2), convert(scan(ts5)));
}
@Test
@@ -317,7 +316,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
RowId rowId2 = insert(binaryRow(new TestKey(2, "2"), value2), txId);
commitWrite(rowId2, clock.now());
- try (PartitionTimestampCursor cursor = scan(row -> true, HybridTimestamp.MAX_VALUE)) {
+ try (PartitionTimestampCursor cursor = scan(HybridTimestamp.MAX_VALUE)) {
assertTrue(cursor.hasNext());
assertTrue(cursor.hasNext());
@@ -370,7 +369,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
addWrite(rowId2, binaryRow22, newTransactionId());
- try (PartitionTimestampCursor cursor = scan(row -> true, clock.now())) {
+ try (PartitionTimestampCursor cursor = scan(clock.now())) {
assertThrows(IllegalStateException.class, () -> cursor.committed(commitTs1));
assertTrue(cursor.hasNext());
@@ -832,7 +831,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
void scanByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() throws Exception {
commitAbortAndAddUncommitted();
- try (Cursor<ReadResult> cursor = storage.scan(k -> true, clock.now())) {
+ try (Cursor<ReadResult> cursor = storage.scan(clock.now())) {
BinaryRow foundRow = cursor.next().binaryRow();
assertRowMatches(foundRow, binaryRow3);
@@ -1103,7 +1102,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
return null;
});
- try (PartitionTimestampCursor cursor = storage.scan(r -> true, clock.now())) {
+ try (PartitionTimestampCursor cursor = storage.scan(clock.now())) {
assertTrue(cursor.hasNext());
ReadResult next = cursor.next();
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index c56ee95354..d5ef2b8509 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -137,8 +137,8 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
assertThrows(IllegalArgumentException.class, () -> partitionStorage0.read(rowId1, HybridTimestamp.MAX_VALUE));
assertThat(unwrap(partitionStorage1.read(rowId1, HybridTimestamp.MAX_VALUE)), is(equalTo(unwrap(testData1))));
- assertThat(toList(partitionStorage0.scan(row -> true, HybridTimestamp.MAX_VALUE)), contains(unwrap(testData0)));
- assertThat(toList(partitionStorage1.scan(row -> true, HybridTimestamp.MAX_VALUE)), contains(unwrap(testData1)));
+ assertThat(toList(partitionStorage0.scan(HybridTimestamp.MAX_VALUE)), contains(unwrap(testData0)));
+ assertThat(toList(partitionStorage1.scan(HybridTimestamp.MAX_VALUE)), contains(unwrap(testData1)));
}
/**
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 24a18b3466..846cfe78f2 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -26,7 +26,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.BiConsumer;
-import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -197,7 +196,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
VersionChain versionChain = map.get(rowId);
- return read(versionChain, timestamp, null, null);
+ return read(versionChain, timestamp, null);
}
/**
@@ -206,14 +205,12 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
* @param versionChain Version chain.
* @param timestamp Timestamp or {@code null} if transaction id is defined.
* @param txId Transaction id or {@code null} if timestamp is defined.
- * @param filter Key filter.
* @return Read result.
*/
private static ReadResult read(
VersionChain versionChain,
@Nullable HybridTimestamp timestamp,
- @Nullable UUID txId,
- @Nullable Predicate<BinaryRow> filter
+ @Nullable UUID txId
) {
assert timestamp == null ^ txId == null;
@@ -224,10 +221,6 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
if (timestamp == null) {
// Search by transaction id.
- if (filter != null && !filter.test(versionChain.row)) {
- return ReadResult.EMPTY;
- }
-
if (versionChain.txId != null && !versionChain.txId.equals(txId)) {
throw new TxIdMismatchException(txId, versionChain.txId);
}
@@ -243,19 +236,14 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
// We *only* have a write-intent, return it.
BinaryRow binaryRow = cur.row;
- if (filter != null && !filter.test(binaryRow)) {
- return ReadResult.EMPTY;
- }
-
- return ReadResult.createFromWriteIntent(binaryRow, cur.txId, cur.commitTableId, cur.commitPartitionId, null
- );
+ return ReadResult.createFromWriteIntent(binaryRow, cur.txId, cur.commitTableId, cur.commitPartitionId, null);
}
// Move to first commit.
cur = cur.next;
}
- return walkVersionChain(versionChain, timestamp, filter, cur);
+ return walkVersionChain(versionChain, timestamp, cur);
}
private static ReadResult versionChainToReadResult(VersionChain versionChain, boolean fillLastCommittedTs) {
@@ -276,12 +264,10 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
*
* @param chainHead Version chain head.
* @param timestamp Timestamp.
- * @param filter Key filter.
* @param firstCommit First commit chain element.
* @return Read result.
*/
- private static ReadResult walkVersionChain(VersionChain chainHead, HybridTimestamp timestamp, @Nullable Predicate<BinaryRow> filter,
- VersionChain firstCommit) {
+ private static ReadResult walkVersionChain(VersionChain chainHead, HybridTimestamp timestamp, VersionChain firstCommit) {
boolean hasWriteIntent = chainHead.ts == null;
if (hasWriteIntent && timestamp.compareTo(firstCommit.ts) > 0) {
@@ -289,13 +275,8 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
// So we just return write-intent.
BinaryRow binaryRow = chainHead.row;
- if (filter != null && !filter.test(binaryRow)) {
- return ReadResult.EMPTY;
- }
-
return ReadResult.createFromWriteIntent(binaryRow, chainHead.txId, chainHead.commitTableId, chainHead.commitPartitionId,
- firstCommit.ts
- );
+ firstCommit.ts);
}
VersionChain cur = firstCommit;
@@ -307,10 +288,6 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
// This commit has timestamp matching the query ts, meaning that commit is the one we are looking for.
BinaryRow binaryRow = cur.row;
- if (filter != null && !filter.test(binaryRow)) {
- return ReadResult.EMPTY;
- }
-
return ReadResult.createFromCommitted(binaryRow, cur.ts);
}
@@ -331,7 +308,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
/** {@inheritDoc} */
@Override
- public PartitionTimestampCursor scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
+ public PartitionTimestampCursor scan(HybridTimestamp timestamp) {
Iterator<VersionChain> iterator = map.values().iterator();
return new PartitionTimestampCursor() {
@@ -348,7 +325,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
}
// We don't check if row conforms the key filter here, because we've already checked it.
- ReadResult read = read(currentChain, timestamp, null, null);
+ ReadResult read = read(currentChain, timestamp, null);
if (read.transactionId() == null) {
return read.binaryRow();
@@ -372,7 +349,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
while (iterator.hasNext()) {
VersionChain chain = iterator.next();
- ReadResult readResult = read(chain, timestamp, null, filter);
+ ReadResult readResult = read(chain, timestamp, null);
if (!readResult.isEmpty()) {
currentChain = chain;
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index f9da31bd26..e6a9da76da 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -378,21 +378,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
// We *only* have a write-intent, return it.
RowVersion rowVersion = readRowVersion(headLink, ALWAYS_LOAD_VALUE);
- assert rowVersion.isUncommitted();
-
- UUID transactionId = versionChain.transactionId();
- UUID commitTableId = versionChain.commitTableId();
- int commitPartitionId = versionChain.commitPartitionId();
-
- BinaryRow row;
-
- if (rowVersion.isTombstone()) {
- row = null;
- } else {
- row = new ByteBufferRow(rowVersion.value());
- }
-
- return ReadResult.createFromWriteIntent(row, transactionId, commitTableId, commitPartitionId, null);
+ return writeIntentToResult(versionChain, rowVersion, null);
}
}
@@ -428,21 +414,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
// So we just return write-intent.
RowVersion rowVersion = readRowVersion(chainHead.headLink(), ALWAYS_LOAD_VALUE);
- assert rowVersion.isUncommitted();
-
- UUID transactionId = chainHead.transactionId();
- UUID commitTableId = chainHead.commitTableId();
- int commitPartitionId = chainHead.commitPartitionId();
-
- BinaryRow row;
-
- if (rowVersion.isTombstone()) {
- row = null;
- } else {
- row = new ByteBufferRow(rowVersion.value());
- }
-
- return ReadResult.createFromWriteIntent(row, transactionId, commitTableId, commitPartitionId, firstCommit.timestamp());
+ return writeIntentToResult(chainHead, rowVersion, firstCommit.timestamp());
}
RowVersion curCommit = firstCommit;
@@ -475,6 +447,18 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
return ReadResult.EMPTY;
}
+ private ReadResult writeIntentToResult(VersionChain chain, RowVersion rowVersion, @Nullable HybridTimestamp lastCommittedTimestamp) {
+ assert rowVersion.isUncommitted();
+
+ UUID transactionId = chain.transactionId();
+ UUID commitTableId = chain.commitTableId();
+ int commitPartitionId = chain.commitPartitionId();
+
+ BinaryRow row = rowVersionToBinaryRow(rowVersion);
+
+ return ReadResult.createFromWriteIntent(row, transactionId, commitTableId, commitPartitionId, lastCommittedTimestamp);
+ }
+
private RowVersion insertRowVersion(@Nullable BinaryRow row, long nextPartitionlessLink) {
byte[] rowBytes = rowBytes(row);
@@ -714,7 +698,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
@Override
- public PartitionTimestampCursor scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException {
+ public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException {
assert timestamp != null;
Cursor<VersionChain> treeCursor;
@@ -725,7 +709,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
throw new StorageException("Find failed", e);
}
- return new TimestampCursor(treeCursor, keyFilter, timestamp);
+ return new TimestampCursor(treeCursor, timestamp);
}
private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, UUID txId) {
@@ -788,8 +772,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
private class TimestampCursor implements PartitionTimestampCursor {
private final Cursor<VersionChain> treeCursor;
- private final Predicate<BinaryRow> keyFilter;
-
private final HybridTimestamp timestamp;
@Nullable
@@ -800,9 +782,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
private boolean iterationExhausted = false;
- public TimestampCursor(Cursor<VersionChain> treeCursor, Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) {
+ public TimestampCursor(Cursor<VersionChain> treeCursor, HybridTimestamp timestamp) {
this.treeCursor = treeCursor;
- this.keyFilter = keyFilter;
this.timestamp = timestamp;
}
@@ -832,10 +813,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
continue;
}
- if (keyFilter != null && !keyFilter.test(res.binaryRow())) {
- continue;
- }
-
nextRead = res;
currentChain = chain;
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java
index 9c989f430f..42c22c02fa 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java
@@ -86,7 +86,7 @@ abstract class AbstractPageMemoryMvPartitionStorageTest extends AbstractMvPartit
insert(longRow, txId);
- try (PartitionTimestampCursor cursor = storage.scan(row -> true, HybridTimestamp.MAX_VALUE)) {
+ try (PartitionTimestampCursor cursor = storage.scan(HybridTimestamp.MAX_VALUE)) {
BinaryRow foundRow = cursor.next().binaryRow();
assertRowMatches(foundRow, longRow);
@@ -101,7 +101,7 @@ abstract class AbstractPageMemoryMvPartitionStorageTest extends AbstractMvPartit
commitWrite(rowId, clock.now());
- try (PartitionTimestampCursor cursor = storage.scan(row -> true, HybridTimestamp.MAX_VALUE)) {
+ try (PartitionTimestampCursor cursor = storage.scan(HybridTimestamp.MAX_VALUE)) {
BinaryRow foundRow = cursor.next().binaryRow();
assertRowMatches(foundRow, longRow);
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index aba18f52b1..dc918fe043 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -876,7 +876,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** {@inheritDoc} */
@Override
- public PartitionTimestampCursor scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException {
+ public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException {
assert timestamp != null;
RocksIterator it = db.newIterator(cf, scanReadOptions);
@@ -967,10 +967,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
continue;
}
- if (keyFilter != null && !keyFilter.test(readResult.binaryRow())) {
- continue;
- }
-
next = readResult;
currentRowId = rowId;
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
index 3c9ca940e4..714b50818e 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
@@ -163,7 +163,7 @@ public class ItInternalTableScanTest {
AtomicReference<Throwable> gotException = new AtomicReference<>();
- when(mockStorage.scan(any(), any(HybridTimestamp.class))).thenAnswer(invocation -> {
+ when(mockStorage.scan(any(HybridTimestamp.class))).thenAnswer(invocation -> {
var cursor = mock(PartitionTimestampCursor.class);
when(cursor.hasNext()).thenAnswer(hnInvocation -> true);
@@ -221,7 +221,7 @@ public class ItInternalTableScanTest {
AtomicReference<Throwable> gotException = new AtomicReference<>();
- when(mockStorage.scan(any(), any(HybridTimestamp.class))).thenThrow(new StorageException("Some storage exception"));
+ when(mockStorage.scan(any(HybridTimestamp.class))).thenThrow(new StorageException("Some storage exception"));
internalTbl.scan(0, null).subscribe(new Subscriber<>() {
@@ -376,7 +376,7 @@ public class ItInternalTableScanTest {
List<BinaryRow> retrievedItems = Collections.synchronizedList(new ArrayList<>());
- when(mockStorage.scan(any(), any(HybridTimestamp.class))).thenAnswer(invocation -> {
+ when(mockStorage.scan(any(HybridTimestamp.class))).thenAnswer(invocation -> {
var cursor = mock(PartitionTimestampCursor.class);
when(cursor.hasNext()).thenAnswer(hnInvocation -> cursorTouchCnt.get() < submittedItems.size());
@@ -444,7 +444,7 @@ public class ItInternalTableScanTest {
// and avoids the race between closing the cursor and stopping the node.
CountDownLatch subscriberFinishedLatch = new CountDownLatch(1);
- lenient().when(mockStorage.scan(any(), any(HybridTimestamp.class))).thenAnswer(invocation -> {
+ lenient().when(mockStorage.scan(any(HybridTimestamp.class))).thenAnswer(invocation -> {
var cursor = mock(PartitionTimestampCursor.class);
doAnswer(
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index cf429c2bfd..259df84a01 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -214,9 +214,8 @@ public class PartitionReplicaListener implements ReplicaListener {
ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
- //TODO: IGNITE-17849 Remove this always true filter after the storage API will be changed.
- PartitionTimestampCursor cursor = cursors.computeIfAbsent(cursorId,
- id -> mvDataStorage.scan(row -> true, HybridTimestamp.MAX_VALUE));
+ @SuppressWarnings("resource") PartitionTimestampCursor cursor = cursors.computeIfAbsent(cursorId,
+ id -> mvDataStorage.scan(HybridTimestamp.MAX_VALUE));
while (batchRows.size() < batchCount && cursor.hasNext()) {
BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), null);
@@ -356,9 +355,8 @@ public class PartitionReplicaListener implements ReplicaListener {
return lockManager.acquire(txId, new LockKey(tableId), LockMode.S).thenCompose(tblLock -> {
ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
- //TODO: IGNITE-17849 Remove this always true filter after the storage API will be changed.
- PartitionTimestampCursor cursor = cursors.computeIfAbsent(cursorId,
- id -> mvDataStorage.scan(row -> true, HybridTimestamp.MAX_VALUE));
+ @SuppressWarnings("resource") PartitionTimestampCursor cursor = cursors.computeIfAbsent(cursorId,
+ id -> mvDataStorage.scan(HybridTimestamp.MAX_VALUE));
while (batchRows.size() < batchCount && cursor.hasNext()) {
BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), txId);