You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/09/27 13:30:14 UTC
[ignite-3] 01/02: IGNITE-17720 Extend MvPartitionStorage scan API with write intent resolution capabilities
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch ignite-17720
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 28a299ddd14e6469fe815dbbee71fd84ad59316a
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Tue Sep 27 16:19:07 2022 +0400
IGNITE-17720 Extend MvPartitionStorage scan API with write intent resolution capabilities
---
.../internal/storage/MvPartitionStorage.java | 4 +-
.../internal/storage/PartitionScanCursor.java | 35 ++++
.../apache/ignite/internal/storage/ReadResult.java | 8 +
.../storage/AbstractMvPartitionStorageTest.java | 60 +++++--
.../TestConcurrentHashMapMvPartitionStorage.java | 67 +++++++-
.../mv/AbstractPageMemoryMvPartitionStorage.java | 179 +++++++++++++++------
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 153 +++++++++++++++++-
.../distributed/storage/VersionedRowStore.java | 5 +-
8 files changed, 438 insertions(+), 73 deletions(-)
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index de9742bf86..4d3dc080b3 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -228,7 +228,7 @@ public interface MvPartitionStorage extends AutoCloseable {
* @deprecated Use {@link #scan(Predicate, HybridTimestamp)}
*/
@Deprecated
- default Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp timestamp) throws StorageException {
+ default PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, Timestamp timestamp) throws StorageException {
return scan(keyFilter, convertTimestamp(timestamp));
}
@@ -241,7 +241,7 @@ public interface MvPartitionStorage extends AutoCloseable {
* @throws TxIdMismatchException If there's another pending update associated with different transaction id.
* @throws StorageException If failed to read data from the storage.
*/
- Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException;
+ PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException;
/**
* Returns rows count belongs to current storage.
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionScanCursor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionScanCursor.java
new file mode 100644
index 0000000000..8e76c3131b
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionScanCursor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Partition cursor.
+ */
+public interface PartitionScanCursor extends Cursor<ReadResult> {
+ /**
+ * Returns a committed row within the current row id.
+ *
+ * @param timestamp Commit timestamp.
+ * @return Row or {@code null} if it doesn't exist.
+ */
+ BinaryRow committed(HybridTimestamp timestamp);
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
index 4ebd74fe3a..a1d912c6b2 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -125,4 +125,12 @@ public class ReadResult {
public int commitPartitionId() {
return commitPartitionId;
}
+
+ public boolean isWriteIntent() {
+ return transactionId != null;
+ }
+
+ public boolean isEmpty() {
+ return this == EMPTY;
+ }
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 21e01306c8..b88f388d56 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -48,7 +48,6 @@ import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -93,7 +92,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
/**
* Scans partition inside of consistency closure.
*/
- protected Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
+ protected Cursor<ReadResult> scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
return storage.runConsistently(() -> storage.scan(filter, timestamp));
}
@@ -159,7 +158,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
@Test
public void testScanOverEmpty() throws Exception {
assertEquals(List.of(), convert(scan(row -> true, newTransactionId())));
- assertEquals(List.of(), convert(scan(row -> true, clock.now())));
+ assertEquals(List.of(), convert0(scan(row -> true, clock.now())));
}
/**
@@ -312,13 +311,13 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
HybridTimestamp ts5 = clock.now();
// Full scan with various timestamp values.
- assertEquals(List.of(), convert(scan(row -> true, ts1)));
+ assertEquals(List.of(), convert0(scan(row -> true, ts1)));
- assertEquals(List.of(value1), convert(scan(row -> true, ts2)));
- assertEquals(List.of(value1), convert(scan(row -> true, ts3)));
+ assertEquals(List.of(value1), convert0(scan(row -> true, ts2)));
+ assertEquals(List.of(value1), convert0(scan(row -> true, ts3)));
- assertEquals(List.of(value1, value2), convert(scan(row -> true, ts4)));
- assertEquals(List.of(value1, value2), convert(scan(row -> true, ts5)));
+ assertEquals(List.of(value1, value2), convert0(scan(row -> true, ts4)));
+ assertEquals(List.of(value1, value2), convert0(scan(row -> true, ts5)));
}
@Test
@@ -364,6 +363,15 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
}
}
+ private List<TestValue> convert0(Cursor<ReadResult> cursor) throws Exception {
+ try (cursor) {
+ return cursor.stream()
+ .map((ReadResult rs) -> BaseMvStoragesTest.value(rs.binaryRow()))
+ .sorted(Comparator.nullsFirst(Comparator.naturalOrder()))
+ .collect(Collectors.toList());
+ }
+ }
+
@Test
void readOfUncommittedRowWithCorrespondingTransactionIdReturnsTheRow() {
RowId rowId = insert(binaryRow, txId);
@@ -765,12 +773,11 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17720")
void scanByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() throws Exception {
commitAbortAndAddUncommitted();
- try (Cursor<BinaryRow> cursor = storage.scan(k -> true, clock.now())) {
- BinaryRow foundRow = cursor.next();
+ try (Cursor<ReadResult> cursor = storage.scan(k -> true, clock.now())) {
+ BinaryRow foundRow = cursor.next().binaryRow();
assertRowMatches(foundRow, binaryRow3);
@@ -1025,6 +1032,37 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
}
}
+ @Test
+ void testScanWithWriteIntent() throws Exception {
+ RowId rowId1 = new RowId(PARTITION_ID);
+
+ HybridTimestamp commit1ts = clock.now();
+
+ storage.runConsistently(() -> {
+ addWrite(rowId1, binaryRow, newTransactionId());
+
+ commitWrite(rowId1, commit1ts);
+
+ addWrite(rowId1, binaryRow2, newTransactionId());
+
+ return null;
+ });
+
+ try (PartitionScanCursor cursor = storage.scan(r -> true, clock.now())) {
+ assertTrue(cursor.hasNext());
+
+ ReadResult next = cursor.next();
+
+ assertTrue(next.isWriteIntent());
+
+ assertRowMatches(next.binaryRow(), binaryRow2);
+
+ BinaryRow committedRow = cursor.committed(next.newestCommitTimestamp());
+
+ assertRowMatches(committedRow, binaryRow);
+ }
+ }
+
@Test
void testScanVersionsWithWriteIntent() throws Exception {
RowId rowId = new RowId(PARTITION_ID, 100, 0);
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
index 65103d0ae9..be90f55363 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.chm;
import java.util.Iterator;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -30,6 +31,7 @@ import java.util.stream.Stream;
import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionScanCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
@@ -327,14 +329,65 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
/** {@inheritDoc} */
@Override
- public Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
- Iterator<BinaryRow> iterator = map.values().stream()
- .map(versionChain -> read(versionChain, timestamp, null, filter))
- .map(ReadResult::binaryRow)
- .filter(Objects::nonNull)
- .iterator();
+ public PartitionScanCursor scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
+ Iterator<VersionChain> iterator = map.values().iterator();
- return Cursor.fromIterator(iterator);
+ return new PartitionScanCursor() {
+
+ private VersionChain currentChain;
+
+ private ReadResult currentReadResult;
+
+ @Override
+ public BinaryRow committed(HybridTimestamp timestamp) {
+ ReadResult read = read(currentChain, timestamp, null, filter);
+
+ if (read.transactionId() == null) {
+ return read.binaryRow();
+ }
+
+ return null;
+ }
+
+ @Override
+ public void close() {
+ // No-op.
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (currentReadResult != null) {
+ return true;
+ }
+
+ while (iterator.hasNext()) {
+ VersionChain chain = iterator.next();
+ ReadResult readResult = read(chain, timestamp, null, filter);
+
+ if (!readResult.isEmpty()) {
+ currentChain = chain;
+ currentReadResult = readResult;
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public ReadResult next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ ReadResult res = currentReadResult;
+
+ currentReadResult = null;
+
+ return res;
+ }
+ };
}
/** {@inheritDoc} */
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 96d4843873..9cc60288cd 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionScanCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
@@ -273,7 +274,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
return null;
}
- return findLatestRowVersion(versionChain, txId, MATCH_ALL).binaryRow();
+ return findLatestRowVersion(versionChain, txId, MATCH_ALL);
}
@Override
@@ -300,13 +301,13 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
}
- private ReadResult findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
+ private @Nullable BinaryRow findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
RowVersion rowVersion = readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
if (!keyFilter.test(row)) {
- return ReadResult.EMPTY;
+ return null;
}
if (versionChain.isUncommitted()) {
@@ -316,11 +317,10 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
throwIfChainBelongsToAnotherTx(versionChain, txId);
- return ReadResult.createFromWriteIntent(row, versionChain.transactionId(), versionChain.commitTableId(), null,
- versionChain.commitPartitionId());
+ return row;
}
- return ReadResult.createFromCommitted(row);
+ return row;
}
private RowVersion readRowVersion(long nextLink, Predicate<HybridTimestamp> loadValue) {
@@ -351,36 +351,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
return new ByteBufferRow(rowVersion.value());
}
- private @Nullable BinaryRow findRowVersionInChain(
- VersionChain versionChain,
- @Nullable UUID transactionId,
- @Nullable HybridTimestamp timestamp,
- Predicate<BinaryRow> keyFilter
- ) {
- assert transactionId != null ^ timestamp != null;
-
- if (transactionId != null) {
- ReadResult res = findLatestRowVersion(versionChain, transactionId, keyFilter);
-
- if (res == null) {
- return null;
- }
-
- return res.binaryRow();
- } else {
- ReadResult res = findRowVersionByTimestamp(versionChain, timestamp);
-
- if (res == null) {
- return null;
- }
-
- BinaryRow row = res.binaryRow();
-
- return keyFilter.test(row) ? row : null;
- }
- }
-
private ReadResult findRowVersionByTimestamp(VersionChain versionChain, HybridTimestamp timestamp) {
+ assert timestamp != null;
+
long headLink = versionChain.headLink();
if (versionChain.isUncommitted()) {
@@ -660,16 +633,26 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
@Override
public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException {
- return internalScan(keyFilter, txId, null);
+ return internalScan(keyFilter, txId);
}
@Override
- public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException {
- return internalScan(keyFilter, null, timestamp);
+ public PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException {
+ assert timestamp != null;
+
+ IgniteCursor<VersionChain> treeCursor;
+
+ try {
+ treeCursor = versionChainTree.find(null, null);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Find failed", e);
+ }
+
+ return new TimestampCursor(treeCursor, keyFilter, timestamp);
}
- private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, @Nullable UUID txId, @Nullable HybridTimestamp timestamp) {
- assert txId != null ^ timestamp != null;
+ private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, UUID txId) {
+ assert txId != null;
IgniteCursor<VersionChain> treeCursor;
@@ -679,7 +662,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
throw new StorageException("Find failed", e);
}
- return new ScanCursor(treeCursor, keyFilter, txId, timestamp);
+ return new ScanCursor(treeCursor, keyFilter, txId);
}
@Override
@@ -712,6 +695,110 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
// TODO: IGNITE-17132 Implement it
}
+ private class TimestampCursor implements PartitionScanCursor {
+ private final IgniteCursor<VersionChain> treeCursor;
+
+ private final Predicate<BinaryRow> keyFilter;
+
+ private final HybridTimestamp timestamp;
+
+ private ReadResult nextRead = null;
+
+ private boolean iterationExhausted = false;
+
+ public TimestampCursor(
+ IgniteCursor<VersionChain> treeCursor,
+ Predicate<BinaryRow> keyFilter,
+ HybridTimestamp timestamp
+ ) {
+ this.treeCursor = treeCursor;
+ this.keyFilter = keyFilter;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (nextRead != null) {
+ return true;
+ }
+
+ if (iterationExhausted) {
+ return false;
+ }
+
+ while (true) {
+ boolean positionedToNext = tryAdvanceTreeCursor();
+
+ if (!positionedToNext) {
+ iterationExhausted = true;
+ return false;
+ }
+
+ VersionChain chain = getCurrentChainFromTreeCursor();
+ ReadResult res = findRowVersionByTimestamp(chain, timestamp);
+
+ if (res.isEmpty()) {
+ continue;
+ }
+
+ if (!keyFilter.test(res.binaryRow())) {
+ continue;
+ }
+
+ nextRead = res;
+ return true;
+ }
+ }
+
+ private boolean tryAdvanceTreeCursor() {
+ try {
+ return treeCursor.next();
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error when trying to advance tree cursor", e);
+ }
+ }
+
+ private VersionChain getCurrentChainFromTreeCursor() {
+ try {
+ return treeCursor.get();
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Failed to get element from tree cursor", e);
+ }
+ }
+
+ @Override
+ public ReadResult next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("The cursor is exhausted");
+ }
+
+ assert nextRead != null;
+
+ ReadResult res = nextRead;
+ nextRead = null;
+
+ return res;
+ }
+
+ @Override
+ public void close() {
+ // No-op.
+ }
+
+ @Override
+ public BinaryRow committed(HybridTimestamp timestamp) {
+ if (iterationExhausted) {
+ throw new NoSuchElementException();
+ }
+
+ VersionChain chain = getCurrentChainFromTreeCursor();
+
+ ReadResult res = findRowVersionByTimestamp(chain, timestamp);
+
+ return res.binaryRow();
+ }
+ }
+
private class ScanCursor implements Cursor<BinaryRow> {
private final IgniteCursor<VersionChain> treeCursor;
@@ -719,8 +806,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
private final @Nullable UUID transactionId;
- private final @Nullable HybridTimestamp timestamp;
-
private BinaryRow nextRow = null;
private boolean iterationExhausted = false;
@@ -728,13 +813,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
public ScanCursor(
IgniteCursor<VersionChain> treeCursor,
Predicate<BinaryRow> keyFilter,
- @Nullable UUID transactionId,
- @Nullable HybridTimestamp timestamp
+ @Nullable UUID transactionId
) {
this.treeCursor = treeCursor;
this.keyFilter = keyFilter;
this.transactionId = transactionId;
- this.timestamp = timestamp;
}
@Override
@@ -756,7 +839,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
VersionChain chain = getCurrentChainFromTreeCursor();
- BinaryRow row = findRowVersionInChain(chain, transactionId, timestamp, keyFilter);
+ BinaryRow row = findLatestRowVersion(chain, transactionId, keyFilter);
if (row != null) {
nextRow = row;
@@ -797,7 +880,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
@Override
public void close() {
- // no-op
+ // No-op.
}
}
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 2d0d46e4be..3028d9afe2 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionScanCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
@@ -541,6 +542,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
// It is guaranteed by descending order of timestamps.
seekIterator.seek(keyBuf.array());
+ return processIterator(seekIterator, rowId, timestamp, keyBuf);
+ }
+
+ private static ReadResult processIterator(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp, ByteBuffer keyBuf) {
// There's no guarantee that required key even exists. If it doesn't, then "seek" will point to a different key.
// To avoid returning its value, we have to check that actual key matches what we need.
// Here we prepare direct buffer to read key without timestamp. Shared direct buffer is used to avoid extra memory allocations.
@@ -689,8 +694,150 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** {@inheritDoc} */
@Override
- public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException {
- return scan(keyFilter, timestamp, null);
+ public PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException {
+ assert timestamp != null;
+
+ // Set next partition as an upper bound.
+ ReadOptions options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+
+ RocksIterator it = db.newIterator(cf, options);
+
+ // Here's seek buffer itself. Originally it contains a valid partition id, row id payload that's filled with zeroes, and maybe
+ // a timestamp value. Zero row id guarantees that it's lexicographically less than or equal to any other row id stored in the
+ // partition.
+ // Byte buffer from a thread-local field can't be used here, because of two reasons:
+ // - no one guarantees that there will only be a single cursor;
+ // - no one guarantees that returned cursor will not be used by other threads.
+ // The thing is, we need this buffer to preserve its content between invocations of "hasNext" method.
+ ByteBuffer seekKeyBuf = ByteBuffer.allocate(MAX_KEY_SIZE).order(BIG_ENDIAN).putShort((short) partitionId);
+
+ return new PartitionScanCursor() {
+
+ private RowId currentRowId;
+
+ @Override
+ public BinaryRow committed(HybridTimestamp timestamp) {
+ if (currentRowId == null) {
+ throw new IllegalStateException();
+ }
+
+ seekKeyBuf.putLong(ROW_ID_OFFSET, currentRowId.mostSignificantBits());
+ seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, currentRowId.leastSignificantBits());
+ putTimestamp(seekKeyBuf.position(ROW_PREFIX_SIZE), timestamp);
+
+ it.seek(seekKeyBuf.position(0));
+
+ ReadResult readResult = processIterator(it, currentRowId, timestamp, seekKeyBuf);
+
+ if (readResult.isEmpty()) {
+ return null;
+ }
+
+ return readResult.binaryRow();
+ }
+
+ /** Cached value for {@link #next()} method. Also optimizes the code of {@link #hasNext()}. */
+ private ReadResult next;
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean hasNext() {
+ // Fast-path for consecutive invocations.
+ if (next != null) {
+ return true;
+ }
+
+ // Prepare direct buffer slice to read keys from the iterator.
+ ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
+
+ while (true) {
+ it.seek(seekKeyBuf);
+
+ // We should do after each seek. Here in particular it means one of two things:
+ // - partition is empty;
+ // - iterator exhausted all the data in partition.
+ if (invalid(it)) {
+ return false;
+ }
+
+ it.key(directBuffer.position(0));
+
+ directBuffer.position(ROW_ID_OFFSET);
+ long msb = directBuffer.getLong();
+ long lsb = directBuffer.getLong();
+
+ var rowId = new RowId(partitionId, msb, lsb);
+
+ seekKeyBuf.putLong(ROW_ID_OFFSET, msb);
+ seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, lsb);
+ putTimestamp(seekKeyBuf.position(ROW_PREFIX_SIZE), timestamp);
+
+ it.seek(seekKeyBuf.position(0));
+
+ ReadResult readResult = processIterator(it, rowId, timestamp, seekKeyBuf);
+
+ if (readResult.isEmpty()) {
+ incrementRowId(seekKeyBuf);
+
+ seekKeyBuf.position(0);
+ continue;
+ }
+
+ next = readResult;
+ currentRowId = rowId;
+
+ return true;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ReadResult next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ ReadResult res = next;
+
+ next = null;
+
+ incrementRowId(seekKeyBuf);
+
+ seekKeyBuf.position(0);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() throws Exception {
+ IgniteUtils.closeAll(it, options);
+ }
+
+ private void incrementRowId(ByteBuffer buf) {
+ long lsb = 1 + buf.getLong(ROW_ID_OFFSET + Long.BYTES);
+
+ buf.putLong(ROW_ID_OFFSET + Long.BYTES, lsb);
+
+ if (lsb != 0L) {
+ return;
+ }
+
+ long msb = 1 + buf.getLong(ROW_ID_OFFSET);
+
+ buf.putLong(ROW_ID_OFFSET, msb);
+
+ if (msb != 0L) {
+ return;
+ }
+
+ short partitionId = (short) (1 + buf.getShort(0));
+
+ assert partitionId != 0;
+
+ buf.putShort(0, partitionId);
+ }
+ };
}
private Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, @Nullable HybridTimestamp timestamp, @Nullable UUID txId)
@@ -698,7 +845,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
assert timestamp == null ^ txId == null;
// Set next partition as an upper bound.
- var options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+ ReadOptions options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
RocksIterator it = db.newIterator(cf, options);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
index 940e8047f9..aa09719b8c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.tx.TxManager;
@@ -542,7 +543,7 @@ public class VersionedRowStore {
*/
public Cursor<BinaryRow> scan(Predicate<BinaryRow> pred) {
// TODO <MUTED> https://issues.apache.org/jira/browse/IGNITE-17309 Transactional support for partition scans
- Cursor<BinaryRow> delegate = storage.scan(pred, Timestamp.nextVersion());
+ Cursor<ReadResult> delegate = storage.scan(pred, Timestamp.nextVersion());
// TODO asch add tx support IGNITE-15087.
return new Cursor<BinaryRow>() {
@@ -560,7 +561,7 @@ public class VersionedRowStore {
}
if (delegate.hasNext()) {
- cur = delegate.next();
+ cur = delegate.next().binaryRow();
return cur != null ? true : hasNext(); // Skip tombstones.
}