You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/10/26 16:55:49 UTC
[ignite-3] 01/02: IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE) (#1207)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit ca20e3c14982e36779ab9a929ca6d6ddb774ac36
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Mon Oct 17 18:07:50 2022 +0400
IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE) (#1207)
---
.../apache/ignite/internal/storage/ReadResult.java | 10 +-
.../ignite/internal/storage/ReadResultTest.java | 36 ++
.../storage/AbstractMvPartitionStorageTest.java | 150 +++++-
.../mv/AbstractPageMemoryMvPartitionStorage.java | 244 +++------
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 585 ++++++++++-----------
5 files changed, 516 insertions(+), 509 deletions(-)
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
index 735f7dfb11..0bd10c4adb 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -32,7 +32,7 @@ public class ReadResult {
/** Empty read result. */
public static final ReadResult EMPTY = new ReadResult(null, null, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
- /** Data. */
+ /** Data. {@code null} iff the result is empty (i.e. no row exists or it is a tombstone). */
private final @Nullable BinaryRow binaryRow;
/** Transaction id. Not {@code null} iff this is a write-intent. */
@@ -90,11 +90,11 @@ public class ReadResult {
}
/**
- * Returns binary row representation of the data.
+ * Returns binary row representation of the data, {@code null} if {@link #isEmpty()}.
*
- * @return Binary row representation of the data.
+ * @return Binary row representation of the data, {@code null} if {@link #isEmpty()}.
*/
- public BinaryRow binaryRow() {
+ public @Nullable BinaryRow binaryRow() {
return binaryRow;
}
@@ -155,6 +155,6 @@ public class ReadResult {
}
public boolean isEmpty() {
- return this == EMPTY;
+ return binaryRow == null;
}
}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java
new file mode 100644
index 0000000000..9310392568
--- /dev/null
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+class ReadResultTest {
+ @Test
+ void resultInEmptyConstantIsEmpty() {
+ assertTrue(ReadResult.EMPTY.isEmpty());
+ }
+
+ @Test
+ void resultWithNullRowIsEmpty() {
+ ReadResult result = ReadResult.createFromCommitted(null, null);
+
+ assertTrue(result.isEmpty());
+ }
+}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 98e25e1f2f..76c6db0b00 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -48,6 +48,8 @@ import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.IgniteBiTuple;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
/**
* Base test for MV partition storages.
@@ -155,9 +157,10 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
assertNull(read(rowId, clock.now()));
}
- @Test
- public void testScanOverEmpty() throws Exception {
- assertEquals(List.of(), convert(scan(clock.now())));
+ @ParameterizedTest
+ @EnumSource
+ public void testScanOverEmpty(ScanTimestampProvider tsProvider) throws Exception {
+ assertEquals(List.of(), convert(scan(tsProvider.scanTimestamp(clock))));
}
/**
@@ -302,8 +305,11 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
assertEquals(List.of(value1, value2), convert(scan(ts4)));
assertEquals(List.of(value1, value2), convert(scan(ts5)));
+
+ assertEquals(List.of(value1, value2), convert(scan(HybridTimestamp.MAX_VALUE)));
}
+ @SuppressWarnings("ConstantConditions")
@Test
public void testTransactionScanCursorInvariants() throws Exception {
TestValue value1 = new TestValue(10, "xxx");
@@ -338,6 +344,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
}
}
+ @SuppressWarnings("ConstantConditions")
@Test
public void testTimestampScanCursorInvariants() throws Exception {
TestValue value11 = new TestValue(10, "xxx");
@@ -409,6 +416,8 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
assertFalse(cursor.hasNext());
assertFalse(cursor.hasNext());
+ assertThrows(NoSuchElementException.class, () -> cursor.next());
+
assertThrows(IllegalStateException.class, () -> cursor.committed(commitTs1));
}
}
@@ -827,11 +836,12 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
});
}
- @Test
- void scanByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() throws Exception {
+ @ParameterizedTest
+ @EnumSource(ScanTimestampProvider.class)
+ void scanWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite(ScanTimestampProvider tsProvider) throws Exception {
commitAbortAndAddUncommitted();
- try (Cursor<ReadResult> cursor = storage.scan(clock.now())) {
+ try (Cursor<ReadResult> cursor = storage.scan(tsProvider.scanTimestamp(clock))) {
BinaryRow foundRow = cursor.next().binaryRow();
assertRowMatches(foundRow, binaryRow3);
@@ -1086,23 +1096,12 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
}
}
- @Test
- void testScanWithWriteIntent() throws Exception {
- RowId rowId1 = new RowId(PARTITION_ID);
-
- HybridTimestamp commit1ts = clock.now();
-
- storage.runConsistently(() -> {
- addWrite(rowId1, binaryRow, newTransactionId());
-
- commitWrite(rowId1, commit1ts);
+ @ParameterizedTest
+ @EnumSource(ScanTimestampProvider.class)
+ void testScanWithWriteIntent(ScanTimestampProvider tsProvider) throws Exception {
+ HybridTimestamp commitTs = addCommittedVersionAndWriteIntent();
- addWrite(rowId1, binaryRow2, newTransactionId());
-
- return null;
- });
-
- try (PartitionTimestampCursor cursor = storage.scan(clock.now())) {
+ try (PartitionTimestampCursor cursor = storage.scan(tsProvider.scanTimestamp(clock))) {
assertTrue(cursor.hasNext());
ReadResult next = cursor.next();
@@ -1111,12 +1110,30 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
assertRowMatches(next.binaryRow(), binaryRow2);
- BinaryRow committedRow = cursor.committed(next.newestCommitTimestamp());
+ BinaryRow committedRow = cursor.committed(commitTs);
assertRowMatches(committedRow, binaryRow);
}
}
+ private HybridTimestamp addCommittedVersionAndWriteIntent() {
+ RowId rowId = new RowId(PARTITION_ID);
+
+ HybridTimestamp commitTs = clock.now();
+
+ storage.runConsistently(() -> {
+ addWrite(rowId, binaryRow, newTransactionId());
+
+ commitWrite(rowId, commitTs);
+
+ addWrite(rowId, binaryRow2, newTransactionId());
+
+ return null;
+ });
+
+ return commitTs;
+ }
+
@Test
void testScanVersionsWithWriteIntent() throws Exception {
RowId rowId = new RowId(PARTITION_ID, 100, 0);
@@ -1141,8 +1158,8 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
assertEquals(2, list.size());
- for (int i = 0; i < list.size(); i++) {
- assertEquals(key, list.get(i).getKey());
+ for (IgniteBiTuple<TestKey, TestValue> objects : list) {
+ assertEquals(key, objects.getKey());
}
assertEquals(value2, list.get(0).getValue());
@@ -1235,6 +1252,72 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
}
}
+ @ParameterizedTest
+ @EnumSource(ScanTimestampProvider.class)
+ public void scanCursorHasNextReturnsFalseEachTimeAfterExhaustion(ScanTimestampProvider tsProvider) throws Exception {
+ RowId rowId = insert(binaryRow, txId);
+ commitWrite(rowId, clock.now());
+
+ try (PartitionTimestampCursor cursor = scan(tsProvider.scanTimestamp(clock))) {
+ cursor.next();
+
+ assertFalse(cursor.hasNext());
+ //noinspection ConstantConditions
+ assertFalse(cursor.hasNext());
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(ScanTimestampProvider.class)
+ public void scanDoesNotSeeTombstonesWhenTombstoneIsNotCommitted(ScanTimestampProvider tsProvider) throws Exception {
+ testScanDoesNotSeeTombstones(tsProvider, false);
+ }
+
+ @ParameterizedTest
+ @EnumSource(ScanTimestampProvider.class)
+ public void scanDoesNotSeeTombstonesWhenTombstoneIsCommitted(ScanTimestampProvider tsProvider) throws Exception {
+ testScanDoesNotSeeTombstones(tsProvider, true);
+ }
+
+ private void testScanDoesNotSeeTombstones(ScanTimestampProvider scantsProvider, boolean commitRemoval) throws Exception {
+ RowId rowId = insert(binaryRow, txId);
+ commitWrite(rowId, clock.now());
+
+ addWrite(rowId, null, newTransactionId());
+ if (commitRemoval) {
+ commitWrite(rowId, clock.now());
+ }
+
+ assertScanSeesNothing(scantsProvider);
+ }
+
+ private void assertScanSeesNothing(ScanTimestampProvider scanTsProvider) throws Exception {
+ try (PartitionTimestampCursor cursor = scan(scanTsProvider.scanTimestamp(clock))) {
+ assertFalse(cursor.hasNext());
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(ScanTimestampProvider.class)
+ void committedMethodCallDoesNotInterfereWithIteratingOverScanCursor(ScanTimestampProvider scanTsProvider) throws Exception {
+ RowId rowId1 = insert(binaryRow, txId);
+ HybridTimestamp commitTs1 = clock.now();
+ commitWrite(rowId1, commitTs1);
+
+ insert(binaryRow2, txId);
+
+ try (PartitionTimestampCursor cursor = scan(scanTsProvider.scanTimestamp(clock))) {
+ cursor.next();
+
+ cursor.committed(commitTs1);
+
+ ReadResult result2 = cursor.next();
+ assertRowMatches(result2.binaryRow(), binaryRow2);
+
+ assertFalse(cursor.hasNext());
+ }
+ }
+
/**
* Returns row id that is lexicographically smaller (by the value of one) than the argument.
*
@@ -1253,4 +1336,21 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
return new RowId(value.partitionId(), msb, lsb);
}
+
+ private enum ScanTimestampProvider {
+ NOW {
+ @Override
+ HybridTimestamp scanTimestamp(HybridClock clock) {
+ return clock.now();
+ }
+ },
+ MAX_VALUE {
+ @Override
+ HybridTimestamp scanTimestamp(HybridClock clock) {
+ return HybridTimestamp.MAX_VALUE;
+ }
+ };
+
+ abstract HybridTimestamp scanTimestamp(HybridClock clock);
+ }
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 60dccdc65e..3297c8cbc6 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -73,8 +73,6 @@ import org.jetbrains.annotations.Nullable;
public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitionStorage {
private static final byte[] TOMBSTONE_PAYLOAD = new byte[0];
- private static final Predicate<BinaryRow> MATCH_ALL = row -> true;
-
private static final Predicate<HybridTimestamp> ALWAYS_LOAD_VALUE = timestamp -> true;
protected final int partitionId;
@@ -260,32 +258,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
}
- /**
- * Reads either the committed value from the storage or the uncommitted value belonging to given transaction.
- *
- * @param rowId Row id.
- * @param txId Transaction id.
- * @return Read result that corresponds to the key or {@code null} if value is not found.
- * @throws TxIdMismatchException If there's another pending update associated with different transaction id.
- * @throws StorageException If failed to read data from the storage.
- */
- // TODO: IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE)
- @Deprecated
- public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
- if (rowId.partitionId() != partitionId) {
- throw new IllegalArgumentException(
- String.format("RowId partition [%d] is not equal to storage partition [%d].", rowId.partitionId(), partitionId));
- }
-
- VersionChain versionChain = findVersionChain(rowId);
-
- if (versionChain == null) {
- return null;
- }
-
- return findLatestRowVersion(versionChain, txId, MATCH_ALL);
- }
-
@Override
public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
if (rowId.partitionId() != partitionId) {
@@ -299,7 +271,15 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
return ReadResult.EMPTY;
}
- return findRowVersionByTimestamp(versionChain, timestamp);
+ if (lookingForLatestVersion(timestamp)) {
+ return findLatestRowVersion(versionChain);
+ } else {
+ return findRowVersionByTimestamp(versionChain, timestamp);
+ }
+ }
+
+ private boolean lookingForLatestVersion(HybridTimestamp timestamp) {
+ return timestamp == HybridTimestamp.MAX_VALUE;
}
private @Nullable VersionChain findVersionChain(RowId rowId) {
@@ -310,26 +290,18 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
}
- private @Nullable BinaryRow findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
+ private ReadResult findLatestRowVersion(VersionChain versionChain) {
RowVersion rowVersion = readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
- ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
-
- if (keyFilter != null && !keyFilter.test(row)) {
- return null;
- }
-
if (versionChain.isUncommitted()) {
- UUID chainTxId = versionChain.transactionId();
+ assert versionChain.transactionId() != null;
- assert chainTxId != null;
-
- throwIfChainBelongsToAnotherTx(versionChain, txId);
+ return writeIntentToResult(versionChain, rowVersion, null);
+ } else {
+ ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
- return row;
+ return ReadResult.createFromCommitted(row, rowVersion.timestamp());
}
-
- return row;
}
private RowVersion readRowVersion(long nextLink, Predicate<HybridTimestamp> loadValue) {
@@ -682,24 +654,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
}
- /**
- * Scans the partition and returns a cursor of values. All filtered values must either be uncommitted in the current transaction
- * or already committed in a different transaction.
- *
- * @param keyFilter Key filter. Binary rows passed to the filter may or may not have a value, filter should only check keys.
- * @param txId Transaction id.
- * @return Cursor.
- * @throws StorageException If failed to read data from the storage.
- */
- // TODO: IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE)
- @Deprecated
- public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException {
- return internalScan(keyFilter, txId);
- }
-
@Override
public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException {
- assert timestamp != null;
+ Objects.requireNonNull(timestamp, "timestamp is null");
Cursor<VersionChain> treeCursor;
@@ -709,21 +666,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
throw new StorageException("Find failed", e);
}
- return new TimestampCursor(treeCursor, timestamp);
- }
-
- private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, UUID txId) {
- assert txId != null;
-
- Cursor<VersionChain> treeCursor;
-
- try {
- treeCursor = versionChainTree.find(null, null);
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Find failed", e);
+ if (lookingForLatestVersion(timestamp)) {
+ return new LatestVersionsCursor(treeCursor);
+ } else {
+ return new TimestampCursor(treeCursor, timestamp);
}
-
- return new TransactionIdCursor(treeCursor, keyFilter, txId);
}
@Override
@@ -765,25 +712,67 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
// TODO: IGNITE-17132 Implement it
}
+ private abstract class BasePartitionTimestampCursor implements PartitionTimestampCursor {
+ protected final Cursor<VersionChain> treeCursor;
+
+ @Nullable
+ protected ReadResult nextRead = null;
+
+ @Nullable
+ protected VersionChain currentChain = null;
+
+ protected BasePartitionTimestampCursor(Cursor<VersionChain> treeCursor) {
+ this.treeCursor = treeCursor;
+ }
+
+ @Override
+ public final ReadResult next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("The cursor is exhausted");
+ }
+
+ assert nextRead != null;
+
+ ReadResult res = nextRead;
+
+ nextRead = null;
+
+ return res;
+ }
+
+ @Override
+ public void close() throws Exception {
+ treeCursor.close();
+ }
+
+ @Override
+ public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
+ if (currentChain == null) {
+ throw new IllegalStateException();
+ }
+
+ ReadResult result = findRowVersionByTimestamp(currentChain, timestamp);
+ if (result.isEmpty()) {
+ return null;
+ }
+
+ // We don't check if row conforms the key filter here, because we've already checked it.
+ return result.binaryRow();
+ }
+ }
+
/**
* Implementation of the {@link PartitionTimestampCursor} over the page memory storage.
* See {@link PartitionTimestampCursor} for the details on the API.
*/
- private class TimestampCursor implements PartitionTimestampCursor {
- private final Cursor<VersionChain> treeCursor;
-
+ private class TimestampCursor extends BasePartitionTimestampCursor {
private final HybridTimestamp timestamp;
- @Nullable
- private ReadResult nextRead = null;
-
- @Nullable
- private VersionChain currentChain = null;
-
private boolean iterationExhausted = false;
public TimestampCursor(Cursor<VersionChain> treeCursor, HybridTimestamp timestamp) {
- this.treeCursor = treeCursor;
+ super(treeCursor);
+
this.timestamp = timestamp;
}
@@ -807,50 +796,18 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
VersionChain chain = treeCursor.next();
- ReadResult res = findRowVersionByTimestamp(chain, timestamp);
+ ReadResult result = findRowVersionByTimestamp(chain, timestamp);
- if (res.isEmpty()) {
+ if (result.isEmpty()) {
continue;
}
- nextRead = res;
+ nextRead = result;
currentChain = chain;
return true;
}
}
-
- @Override
- public ReadResult next() {
- if (!hasNext()) {
- throw new NoSuchElementException("The cursor is exhausted");
- }
-
- assert nextRead != null;
-
- ReadResult res = nextRead;
-
- nextRead = null;
-
- return res;
- }
-
- @Override
- public void close() {
- // No-op.
- }
-
- @Override
- public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
- if (currentChain == null) {
- throw new IllegalStateException();
- }
-
- ReadResult res = findRowVersionByTimestamp(currentChain, timestamp);
-
- // We don't check if row conforms the key filter here, because we've already checked it.
- return res.binaryRow();
- }
}
/**
@@ -858,30 +815,16 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
* Scans the partition and returns a cursor of values. All filtered values must either be uncommitted in the current transaction
* or already committed in a different transaction.
*/
- private class TransactionIdCursor implements Cursor<BinaryRow> {
- private final Cursor<VersionChain> treeCursor;
-
- private final Predicate<BinaryRow> keyFilter;
-
- private final @Nullable UUID transactionId;
-
- private BinaryRow nextRow = null;
-
+ private class LatestVersionsCursor extends BasePartitionTimestampCursor {
private boolean iterationExhausted = false;
- public TransactionIdCursor(
- Cursor<VersionChain> treeCursor,
- Predicate<BinaryRow> keyFilter,
- @Nullable UUID transactionId
- ) {
- this.treeCursor = treeCursor;
- this.keyFilter = keyFilter;
- this.transactionId = transactionId;
+ public LatestVersionsCursor(Cursor<VersionChain> treeCursor) {
+ super(treeCursor);
}
@Override
public boolean hasNext() {
- if (nextRow != null) {
+ if (nextRead != null) {
return true;
}
@@ -896,32 +839,17 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
VersionChain chain = treeCursor.next();
- BinaryRow row = findLatestRowVersion(chain, transactionId, keyFilter);
+ ReadResult result = findLatestRowVersion(chain);
- if (row != null) {
- nextRow = row;
- return true;
+ if (result.isEmpty()) {
+ continue;
}
- }
- }
- @Override
- public BinaryRow next() {
- if (!hasNext()) {
- throw new NoSuchElementException("The cursor is exhausted");
- }
-
- assert nextRow != null;
-
- BinaryRow row = nextRow;
- nextRow = null;
-
- return row;
- }
+ nextRead = result;
+ currentChain = chain;
- @Override
- public void close() {
- // No-op.
+ return true;
+ }
}
}
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 85375c72ac..1a598edd3b 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -31,10 +31,10 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
-import java.util.function.Predicate;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
@@ -468,31 +468,9 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
}
}
- /**
- * Reads either the committed value from the storage or the uncommitted value belonging to given transaction.
- *
- * @param rowId Row id.
- * @param txId Transaction id.
- * @return Read result that corresponds to the key or {@code null} if value is not found.
- * @throws TxIdMismatchException If there's another pending update associated with different transaction id.
- * @throws StorageException If failed to read data from the storage.
- */
- // TODO: IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE)
- @Deprecated
- public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
- return read(rowId, null, txId).binaryRow();
- }
-
/** {@inheritDoc} */
@Override
public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
- return read(rowId, timestamp, null);
- }
-
- private ReadResult read(RowId rowId, @Nullable HybridTimestamp timestamp, @Nullable UUID txId)
- throws TxIdMismatchException, StorageException {
- assert timestamp == null ^ txId == null;
-
if (rowId.partitionId() != partitionId) {
throw new IllegalArgumentException(
String.format("RowId partition [%d] is not equal to storage partition [%d].", rowId.partitionId(), partitionId));
@@ -503,55 +481,65 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
try (
// Set next partition as an upper bound.
- var readOpts = new ReadOptions().setIterateUpperBound(upperBound);
- RocksIterator baseIterator = db.newIterator(cf, readOpts);
+ var readOpts1 = new ReadOptions().setIterateUpperBound(upperBound);
+ RocksIterator baseIterator = db.newIterator(cf, readOpts1);
// "count()" check is mandatory. Write batch iterator without any updates just crashes everything.
// It's not documented, but this is exactly how it should be used.
RocksIterator seekIterator = writeBatch != null && writeBatch.count() > 0
? writeBatch.newIteratorWithBase(cf, baseIterator)
: baseIterator
) {
- if (timestamp == null) {
- ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+ if (lookingForLatestVersions(timestamp)) {
+ return readLatestVersion(rowId, seekIterator);
+ } else {
+ return readByTimestamp(seekIterator, rowId, timestamp);
+ }
+ }
+ }
- // Seek to the first appearance of row id if timestamp isn't set.
- // Since timestamps are sorted from newest to oldest, first occurrence will always be the latest version.
- // Unfortunately, copy here is unavoidable with current API.
- assert keyBuf.position() == ROW_PREFIX_SIZE;
- seekIterator.seek(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+ private boolean lookingForLatestVersions(HybridTimestamp timestamp) {
+ return timestamp == HybridTimestamp.MAX_VALUE;
+ }
- if (invalid(seekIterator)) {
- // No data at all.
- return ReadResult.EMPTY;
- }
+ private ReadResult readLatestVersion(RowId rowId, RocksIterator seekIterator) {
+ ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+
+ // Seek to the first appearance of row id if timestamp isn't set.
+ // Since timestamps are sorted from newest to oldest, first occurrence will always be the latest version.
+ // Unfortunately, copy here is unavoidable with current API.
+ assert keyBuf.position() == ROW_PREFIX_SIZE;
+ seekIterator.seek(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
- ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
+ if (invalid(seekIterator)) {
+ // No data at all.
+ return ReadResult.EMPTY;
+ }
- int keyLength = seekIterator.key(readKeyBuf);
+ ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
- if (!matches(rowId, readKeyBuf)) {
- // Wrong row id.
- return ReadResult.EMPTY;
- }
+ int keyLength = seekIterator.key(readKeyBuf);
- boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+ if (!matches(rowId, readKeyBuf)) {
+ // It is already a different row, so no version exists for our rowId.
+ return ReadResult.EMPTY;
+ }
- byte[] valueBytes = seekIterator.value();
+ boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
- if (!isWriteIntent) {
- // There is no write-intent, return latest committed row.
- return wrapCommittedValue(valueBytes, readTimestamp(readKeyBuf));
- }
+ byte[] valueBytes = seekIterator.value();
- assert valueBytes != null;
+ return readResultFromKeyAndValue(isWriteIntent, readKeyBuf, valueBytes);
+ }
- validateTxId(valueBytes, txId);
+ private static ReadResult readResultFromKeyAndValue(boolean isWriteIntent, ByteBuffer keyBuf, byte[] valueBytes) {
+ assert valueBytes != null;
- return wrapUncommittedValue(valueBytes, null);
- } else {
- return readByTimestamp(seekIterator, rowId, timestamp);
- }
+ if (!isWriteIntent) {
+ // There is no write-intent, return latest committed row.
+ return wrapCommittedValue(valueBytes, readTimestamp(keyBuf));
}
+
+ return wrapUncommittedValue(valueBytes, null);
}
/**
@@ -562,7 +550,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
* @param timestamp Timestamp.
* @return Read result.
*/
- private @Nullable ReadResult readByTimestamp(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp) {
+ private ReadResult readByTimestamp(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp) {
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
// Put timestamp restriction according to N2O timestamps order.
@@ -685,14 +673,14 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
* Checks if row id matches the one written in the key buffer. Note: this operation changes the position in the buffer.
*
* @param rowId Row id.
- * @param keyByf Key buffer.
+ * @param keyBuf Key buffer.
* @return {@code true} if row id matches the key buffer, {@code false} otherwise.
*/
- private static boolean matches(RowId rowId, ByteBuffer keyByf) {
+ private static boolean matches(RowId rowId, ByteBuffer keyBuf) {
// Comparison starts from the position of the row id.
- keyByf.position(ROW_ID_OFFSET);
+ keyBuf.position(ROW_ID_OFFSET);
- return rowId.mostSignificantBits() == normalize(keyByf.getLong()) && rowId.leastSignificantBits() == normalize(keyByf.getLong());
+ return rowId.mostSignificantBits() == normalize(keyBuf.getLong()) && rowId.leastSignificantBits() == normalize(keyBuf.getLong());
}
@Override
@@ -718,13 +706,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
- if (!isWriteIntent) {
- return wrapCommittedValue(value, readTimestamp(ByteBuffer.wrap(key).order(KEY_BYTE_ORDER)));
- }
-
- assert value != null;
-
- return wrapUncommittedValue(value, null);
+ return readResultFromKeyAndValue(isWriteIntent, ByteBuffer.wrap(key).order(KEY_BYTE_ORDER), value);
}
@Override
@@ -736,264 +718,28 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
};
}
- /**
- * Scans the partition and returns a cursor of values. All filtered values must either be uncommitted in the current transaction
- * or already committed in a different transaction.
- *
- * @param keyFilter Key filter. Binary rows passed to the filter may or may not have a value, filter should only check keys.
- * @param txId Transaction id.
- * @return Cursor.
- * @throws StorageException If failed to read data from the storage.
- */
// TODO: IGNITE-16914 Play with prefix settings and benchmark results.
- // TODO: IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE)
- @Deprecated
- public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException {
- assert txId != null;
-
- RocksIterator it = db.newIterator(cf, scanReadOptions);
-
- // Seek iterator to the beginning of the partition.
- it.seek(partitionStartPrefix());
-
- // Here's seek buffer itself. Originally it contains a valid partition id, row id payload that's filled with zeroes, and maybe
- // a timestamp value. Zero row id guarantees that it's lexicographically less than or equal to any other row id stored in the
- // partition.
- // Byte buffer from a thread-local field can't be used here, because of two reasons:
- // - no one guarantees that there will only be a single cursor;
- // - no one guarantees that returned cursor will not be used by other threads.
- // The thing is, we need this buffer to preserve its content between invocations of "hasNext" method.
- ByteBuffer seekKeyBuf = ByteBuffer.allocate(ROW_PREFIX_SIZE).order(KEY_BYTE_ORDER).putShort((short) partitionId);
-
- return new Cursor<>() {
- /** Cached value for {@link #next()} method. Also optimizes the code of {@link #hasNext()}. */
- private BinaryRow next;
-
- /** {@inheritDoc} */
- @Override
- public boolean hasNext() {
- // Fast-path for consecutive invocations.
- if (next != null) {
- return true;
- }
-
- // Prepare direct buffer slice to read keys from the iterator.
- ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
-
- while (true) {
- // We should do after each seek. Here in particular it means one of two things:
- // - partition is empty;
- // - iterator exhausted all the data in partition.
- if (invalid(it)) {
- return false;
- }
-
- // At this point, seekKeyBuf should contain row id that's above the one we already scanned, but not greater than any
- // other row id in partition. When we start, row id is filled with zeroes. Value during the iteration is described later
- // in this code. Now let's describe what we'll find, assuming that iterator found something:
- // - if timestamp is null:
- // - this seek will find the newest version of the next row in iterator. Exactly what we need.
- // - if timestamp is not null:
- // - suppose that seek key buffer has the following value: "| P0 | R0 | T0 |" (partition, row id, timestamp)
- // and iterator finds something, let's denote it as "| P0 | R1 | T1 |" (partition must match). Again, there are
- // few possibilities here:
- // - R1 == R0, this means a match. By the rules of ordering we derive that T1 >= T0. Timestamps are stored in
- // descending order, this means that we found exactly what's needed.
- // - R1 > R0, this means that we found next row and T1 is either missing (pending row) or represents the latest
- // version of the row. It doesn't matter in this case, because this row id will be reused to find its value
- // at time T0. Additional "seek" will be required to do it.
- it.seek(seekKeyBuf.array());
-
- // Finish scan if nothing was found.
- if (invalid(it)) {
- return false;
- }
-
- // Read the actual key into a direct buffer.
- int keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
-
- boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
-
- directBuffer.limit(ROW_PREFIX_SIZE);
-
- // Copy actual row id into a "seekKeyBuf" buffer.
- seekKeyBuf.putLong(ROW_ID_OFFSET, directBuffer.getLong(ROW_ID_OFFSET));
- seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, directBuffer.getLong(ROW_ID_OFFSET + Long.BYTES));
-
- // This one might look tricky. We finished processing next row. There are three options:
- // - "found" flag is false - there's no fitting version of the row. We'll continue to next iteration;
- // - value is empty, we found a tombstone. We'll continue to next iteration as well;
- // - value is not empty and everything's good. We'll cache it and return from method.
- // In all three cases we need to prepare the value of "seekKeyBuf" so that it has not-yet-scanned row id in it.
- // the only valid way to do so is to treat row id payload as one big unsigned integer in Big Endian and increment it.
- // It's important to note that increment may overflow. In this case "carry flag" will go into incrementing partition id.
- // This is fine for three reasons:
- // - iterator has an upper bound, following "seek" will result in invalid iterator state.
- // - partition id itself cannot be overflown, because it's limited with a constant less than 0xFFFF.
- // It's something like 65500, I think.
- // - "seekKeyBuf" buffer value will not be used after that, so it's ok if we corrupt its data (in every other instance,
- // buffer starts with a valid partition id, which is set during buffer's initialization).
- incrementRowId(seekKeyBuf);
-
- // Cache row and return "true" if it's found and not a tombstone.
- byte[] valueBytes = it.value();
-
- BinaryRow binaryRow = wrapValueIntoBinaryRow(valueBytes, isWriteIntent);
-
- if (binaryRow != null && (keyFilter == null || keyFilter.test(binaryRow))) {
- if (isWriteIntent) {
- validateTxId(valueBytes, txId);
- }
-
- next = binaryRow;
-
- return true;
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public BinaryRow next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
- BinaryRow res = next;
-
- next = null;
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override
- public void close() throws Exception {
- IgniteUtils.closeAll(it);
- }
- };
- }
-
/** {@inheritDoc} */
@Override
public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException {
- assert timestamp != null;
-
- RocksIterator it = db.newIterator(cf, scanReadOptions);
-
- // You can see the motivation behind the usage of a separate (not thread-local) buffer in the transaction id
- // cursor code.
- ByteBuffer seekKeyBuf = ByteBuffer.allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER).putShort((short) partitionId);
-
- return new PartitionTimestampCursor() {
-
- private RowId currentRowId;
-
- /** Cached value for {@link #next()} method. Also optimizes the code of {@link #hasNext()}. */
- private ReadResult next;
-
- private void setKeyBuffer(ByteBuffer keyBuf, RowId rowId, HybridTimestamp timestamp) {
- keyBuf.putLong(ROW_ID_OFFSET, normalize(rowId.mostSignificantBits()));
- keyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, normalize(rowId.leastSignificantBits()));
-
- putTimestamp(keyBuf.position(ROW_PREFIX_SIZE), timestamp);
-
- keyBuf.position(0);
- }
-
- @Override
- public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
- if (currentRowId == null) {
- throw new IllegalStateException();
- }
-
- setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
-
- it.seek(seekKeyBuf.array());
-
- ReadResult readResult = handleReadByTimestampIterator(it, currentRowId, timestamp, seekKeyBuf);
-
- if (readResult.isEmpty()) {
- return null;
- }
-
- // We don't check if row conforms the key filter here, because we've already checked it.
- return readResult.binaryRow();
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean hasNext() {
- // Fast-path for consecutive invocations.
- if (next != null) {
- return true;
- }
-
- if (currentRowId != null) {
- setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
- incrementRowId(seekKeyBuf);
- }
-
- currentRowId = null;
-
- // Prepare direct buffer slice to read keys from the iterator.
- ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
-
- while (true) {
- it.seek(seekKeyBuf.array());
-
- if (invalid(it)) {
- return false;
- }
+ Objects.requireNonNull(timestamp, "timestamp is null");
- // We need to figure out what current row id is.
- it.key(directBuffer.position(0));
-
- directBuffer.position(ROW_ID_OFFSET);
-
- RowId rowId = getRowId(directBuffer);
-
- setKeyBuffer(seekKeyBuf, rowId, timestamp);
-
- // Seek to current row id + timestamp.
- it.seek(seekKeyBuf.array());
-
- ReadResult readResult = handleReadByTimestampIterator(it, rowId, timestamp, seekKeyBuf);
-
- if (readResult.isEmpty()) {
- // Seek to next row id as we found nothing that matches.
- incrementRowId(seekKeyBuf);
-
- continue;
- }
-
- next = readResult;
- currentRowId = rowId;
-
- return true;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public ReadResult next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
- ReadResult res = next;
+ if (lookingForLatestVersions(timestamp)) {
+ return new ScanLatestVersionsCursor();
+ } else {
+ return new ScanByTimestampCursor(timestamp);
+ }
+ }
- next = null;
+ private void setKeyBuffer(ByteBuffer keyBuf, RowId rowId, @Nullable HybridTimestamp timestamp) {
+ keyBuf.putLong(ROW_ID_OFFSET, normalize(rowId.mostSignificantBits()));
+ keyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, normalize(rowId.leastSignificantBits()));
- return res;
- }
+ if (timestamp != null) {
+ putTimestamp(keyBuf.position(ROW_PREFIX_SIZE), timestamp);
+ }
- /** {@inheritDoc} */
- @Override
- public void close() throws Exception {
- IgniteUtils.closeAll(it);
- }
- };
+ keyBuf.position(0);
}
@Override
@@ -1013,8 +759,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
it.key(readKeyBuf);
- readKeyBuf.position(ROW_ID_OFFSET);
-
return getRowId(readKeyBuf);
} finally {
keyBuf.limit(MAX_KEY_SIZE);
@@ -1047,8 +791,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
buf.position(0);
}
- private RowId getRowId(ByteBuffer readKeyBuf) {
- return new RowId(partitionId, normalize(readKeyBuf.getLong()), normalize(readKeyBuf.getLong()));
+ private RowId getRowId(ByteBuffer buffer) {
+ buffer.position(ROW_ID_OFFSET);
+
+ return new RowId(partitionId, normalize(buffer.getLong()), normalize(buffer.getLong()));
}
@Override
@@ -1088,7 +834,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
boolean valueHasTxId = keyBytes.length == ROW_PREFIX_SIZE;
if (!isTombstone(valueBytes, valueHasTxId)) {
- ByteBuffer keyBuf = ByteBuffer.wrap(keyBytes).order(KEY_BYTE_ORDER).position(ROW_ID_OFFSET);
+ ByteBuffer keyBuf = ByteBuffer.wrap(keyBytes).order(KEY_BYTE_ORDER);
RowId rowId = getRowId(keyBuf);
@@ -1295,4 +1041,201 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
private static boolean isTombstone(byte[] valueBytes, boolean hasTxId) {
return valueBytes.length == (hasTxId ? VALUE_HEADER_SIZE : 0);
}
+
+ private abstract class BasePartitionTimestampCursor implements PartitionTimestampCursor {
+ protected final RocksIterator it = db.newIterator(cf, scanReadOptions);
+
+ // Here's seek buffer itself. Originally it contains a valid partition id, row id payload that's filled with zeroes, and maybe
+ // a timestamp value. Zero row id guarantees that it's lexicographically less than or equal to any other row id stored in the
+ // partition.
+ // Byte buffer from a thread-local field can't be used here, because of two reasons:
+ // - no one guarantees that there will only be a single cursor;
+ // - no one guarantees that returned cursor will not be used by other threads.
+ // The thing is, we need this buffer to preserve its content between invocations of "hasNext" method.
+ protected final ByteBuffer seekKeyBuf = ByteBuffer.allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER).putShort((short) partitionId);
+
+ protected RowId currentRowId;
+
+ /** Cached value for {@link #next()} method. Also optimizes the code of {@link #hasNext()}. */
+ protected ReadResult next;
+
+ @Override
+ public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
+ Objects.requireNonNull(timestamp, "timestamp is null");
+
+ if (currentRowId == null) {
+ throw new IllegalStateException("currentRowId is null");
+ }
+
+ setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
+
+ it.seek(seekKeyBuf.array());
+
+ ReadResult readResult = handleReadByTimestampIterator(it, currentRowId, timestamp, seekKeyBuf);
+
+ if (readResult.isEmpty()) {
+ return null;
+ }
+
+ return readResult.binaryRow();
+ }
+
+ @Override
+ public final ReadResult next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ ReadResult res = next;
+
+ next = null;
+
+ return res;
+ }
+
+ @Override
+ public final void close() throws Exception {
+ it.close();
+ }
+ }
+
+ private final class ScanLatestVersionsCursor extends BasePartitionTimestampCursor {
+ @Override
+ public boolean hasNext() {
+ // Fast-path for consecutive invocations.
+ if (next != null) {
+ return true;
+ }
+
+ if (currentRowId != null) {
+ setKeyBuffer(seekKeyBuf, currentRowId, null);
+ incrementRowId(seekKeyBuf);
+ }
+
+ currentRowId = null;
+
+ // Prepare direct buffer slice to read keys from the iterator.
+ ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
+
+ while (true) {
+ // At this point, seekKeyBuf should contain row id that's above the one we already scanned, but not greater than any
+ // other row id in partition. When we start, row id is filled with zeroes. Value during the iteration is described later
+ // in this code. Now let's describe what we'll find, assuming that iterator found something:
+ // - if timestamp is null:
+ // - this seek will find the newest version of the next row in iterator. Exactly what we need.
+ // - if timestamp is not null:
+ // - suppose that seek key buffer has the following value: "| P0 | R0 | T0 |" (partition, row id, timestamp)
+ // and iterator finds something, let's denote it as "| P0 | R1 | T1 |" (partition must match). Again, there are
+ // few possibilities here:
+ // - R1 == R0, this means a match. By the rules of ordering we derive that T1 >= T0. Timestamps are stored in
+ // descending order, this means that we found exactly what's needed.
+ // - R1 > R0, this means that we found next row and T1 is either missing (pending row) or represents the latest
+ // version of the row. It doesn't matter in this case, because this row id will be reused to find its value
+ // at time T0. Additional "seek" will be required to do it.
+ it.seek(seekKeyBuf.array());
+
+ // Finish scan if nothing was found.
+ if (invalid(it)) {
+ return false;
+ }
+
+ // Read the actual key into a direct buffer.
+ int keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
+
+ boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+
+ directBuffer.limit(ROW_PREFIX_SIZE);
+
+ // Copy actual row id into a "seekKeyBuf" buffer.
+ seekKeyBuf.putLong(ROW_ID_OFFSET, directBuffer.getLong(ROW_ID_OFFSET));
+ seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, directBuffer.getLong(ROW_ID_OFFSET + Long.BYTES));
+
+ // This one might look tricky. We finished processing next row. There are three options:
+ // - "found" flag is false - there's no fitting version of the row. We'll continue to next iteration;
+ // - value is empty, we found a tombstone. We'll continue to next iteration as well;
+ // - value is not empty and everything's good. We'll cache it and return from method.
+ // In all three cases we need to prepare the value of "seekKeyBuf" so that it has not-yet-scanned row id in it.
+ // the only valid way to do so is to treat row id payload as one big unsigned integer in Big Endian and increment it.
+ // It's important to note that increment may overflow. In this case "carry flag" will go into incrementing partition id.
+ // This is fine for three reasons:
+ // - iterator has an upper bound, following "seek" will result in invalid iterator state.
+ // - partition id itself cannot be overflown, because it's limited with a constant less than 0xFFFF.
+ // It's something like 65500, I think.
+ // - "seekKeyBuf" buffer value will not be used after that, so it's ok if we corrupt its data (in every other instance,
+ // buffer starts with a valid partition id, which is set during buffer's initialization).
+ incrementRowId(seekKeyBuf);
+
+ // Cache row and return "true" if it's found and not a tombstone.
+ byte[] valueBytes = it.value();
+
+ directBuffer.limit(keyLength);
+ ReadResult readResult = readResultFromKeyAndValue(isWriteIntent, directBuffer, valueBytes);
+
+ if (!readResult.isEmpty()) {
+ next = readResult;
+ currentRowId = getRowId(directBuffer);
+
+ return true;
+ }
+ }
+ }
+ }
+
+ private final class ScanByTimestampCursor extends BasePartitionTimestampCursor {
+ private final HybridTimestamp timestamp;
+
+ public ScanByTimestampCursor(HybridTimestamp timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public boolean hasNext() {
+ // Fast-path for consecutive invocations.
+ if (next != null) {
+ return true;
+ }
+
+ if (currentRowId != null) {
+ setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
+ incrementRowId(seekKeyBuf);
+ }
+
+ currentRowId = null;
+
+ // Prepare direct buffer slice to read keys from the iterator.
+ ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
+
+ while (true) {
+ it.seek(seekKeyBuf.array());
+
+ if (invalid(it)) {
+ return false;
+ }
+
+ // We need to figure out what current row id is.
+ it.key(directBuffer.position(0));
+
+ RowId rowId = getRowId(directBuffer);
+
+ setKeyBuffer(seekKeyBuf, rowId, timestamp);
+
+ // Seek to current row id + timestamp.
+ it.seek(seekKeyBuf.array());
+
+ ReadResult readResult = handleReadByTimestampIterator(it, rowId, timestamp, seekKeyBuf);
+
+ if (readResult.isEmpty()) {
+ // Seek to next row id as we found nothing that matches.
+ incrementRowId(seekKeyBuf);
+
+ continue;
+ }
+
+ next = readResult;
+ currentRowId = rowId;
+
+ return true;
+ }
+ }
+ }
}