You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/10/26 16:55:50 UTC
[ignite-3] 02/02: IGNITE-17968 Fix write-intents being filtered out in case if it's a tombstone (#1252)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit c955bd8f823b6586e469869d6d8179d524362300
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Oct 26 19:30:58 2022 +0300
IGNITE-17968 Fix write-intents being filtered out in case if it's a tombstone (#1252)
---
.../internal/storage/MvPartitionStorage.java | 3 +-
.../apache/ignite/internal/storage/ReadResult.java | 3 +-
.../storage/AbstractMvPartitionStorageTest.java | 32 +++++++-------
.../storage/impl/TestMvPartitionStorage.java | 2 +-
.../mv/AbstractPageMemoryMvPartitionStorage.java | 13 ++++--
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 49 +++++++++++++++++-----
6 files changed, 72 insertions(+), 30 deletions(-)
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 7f87994f03..52322783ae 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -170,7 +170,8 @@ public interface MvPartitionStorage extends AutoCloseable {
Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException;
/**
- * Scans the partition and returns a cursor of values at the given timestamp.
+ * Scans the partition and returns a cursor of values at the given timestamp. This cursor filters out committed tombstones, but not
+ * tombstones in the write-intent state.
*
* @param timestamp Timestamp. Can't be {@code null}.
* @return Cursor.
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
index 0bd10c4adb..0b6a5f0dcb 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -130,7 +130,8 @@ public class ReadResult {
}
/**
- * Returns timestamp of the most recent commit of the row. Might be {@code null} for {@link MvPartitionStorage#scanVersions(RowId)}
+ * Returns timestamp of the most recent commit of the row. Not {@code null} if committed version exists, this is a
+ * write-intent and read was made with a timestamp. Might be {@code null} for {@link MvPartitionStorage#scanVersions(RowId)}
* even for write intents having a preceding committed version.
*
* @return Timestamp of the most recent commit of the row.
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 76c6db0b00..599265679f 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -1269,30 +1269,34 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
@ParameterizedTest
@EnumSource(ScanTimestampProvider.class)
- public void scanDoesNotSeeTombstonesWhenTombstoneIsNotCommitted(ScanTimestampProvider tsProvider) throws Exception {
- testScanDoesNotSeeTombstones(tsProvider, false);
+ public void scanSeesTombstonesWhenTombstoneIsNotCommitted(ScanTimestampProvider tsProvider) throws Exception {
+ RowId rowId = insert(binaryRow, txId);
+ HybridTimestamp commitTs = clock.now();
+ commitWrite(rowId, commitTs);
+
+ addWrite(rowId, null, newTransactionId());
+
+ try (PartitionTimestampCursor cursor = scan(tsProvider.scanTimestamp(clock))) {
+ assertTrue(cursor.hasNext());
+
+ ReadResult next = cursor.next();
+ assertNull(next.binaryRow());
+ assertEquals(commitTs, next.newestCommitTimestamp());
+
+ assertFalse(cursor.hasNext());
+ }
}
@ParameterizedTest
@EnumSource(ScanTimestampProvider.class)
public void scanDoesNotSeeTombstonesWhenTombstoneIsCommitted(ScanTimestampProvider tsProvider) throws Exception {
- testScanDoesNotSeeTombstones(tsProvider, true);
- }
-
- private void testScanDoesNotSeeTombstones(ScanTimestampProvider scantsProvider, boolean commitRemoval) throws Exception {
RowId rowId = insert(binaryRow, txId);
commitWrite(rowId, clock.now());
addWrite(rowId, null, newTransactionId());
- if (commitRemoval) {
- commitWrite(rowId, clock.now());
- }
-
- assertScanSeesNothing(scantsProvider);
- }
+ commitWrite(rowId, clock.now());
- private void assertScanSeesNothing(ScanTimestampProvider scanTsProvider) throws Exception {
- try (PartitionTimestampCursor cursor = scan(scanTsProvider.scanTimestamp(clock))) {
+ try (PartitionTimestampCursor cursor = scan(tsProvider.scanTimestamp(clock))) {
assertFalse(cursor.hasNext());
}
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index f6043ae9a8..50e27bd297 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -351,7 +351,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
VersionChain chain = iterator.next();
ReadResult readResult = read(chain, timestamp, null);
- if (!readResult.isEmpty()) {
+ if (!readResult.isEmpty() || readResult.isWriteIntent()) {
currentChain = chain;
currentReadResult = readResult;
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 3297c8cbc6..d05f62ed46 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -296,7 +296,14 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
if (versionChain.isUncommitted()) {
assert versionChain.transactionId() != null;
- return writeIntentToResult(versionChain, rowVersion, null);
+ HybridTimestamp newestCommitTs = null;
+
+ if (versionChain.hasCommittedVersions()) {
+ long newestCommitLink = versionChain.newestCommittedLink();
+ newestCommitTs = readRowVersion(newestCommitLink, ALWAYS_LOAD_VALUE).timestamp();
+ }
+
+ return writeIntentToResult(versionChain, rowVersion, newestCommitTs);
} else {
ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
@@ -798,7 +805,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
VersionChain chain = treeCursor.next();
ReadResult result = findRowVersionByTimestamp(chain, timestamp);
- if (result.isEmpty()) {
+ if (result.isEmpty() && !result.isWriteIntent()) {
continue;
}
@@ -841,7 +848,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
VersionChain chain = treeCursor.next();
ReadResult result = findLatestRowVersion(chain);
- if (result.isEmpty()) {
+ if (result.isEmpty() && !result.isWriteIntent()) {
continue;
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 1a598edd3b..06862c8091 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -1115,9 +1115,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
currentRowId = null;
// Prepare direct buffer slice to read keys from the iterator.
- ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
+ ByteBuffer currentKeyBuffer = MV_KEY_BUFFER.get().position(0);
while (true) {
+ currentKeyBuffer.position(0);
+
// At this point, seekKeyBuf should contain row id that's above the one we already scanned, but not greater than any
// other row id in partition. When we start, row id is filled with zeroes. Value during the iteration is described later
// in this code. Now let's describe what we'll find, assuming that iterator found something:
@@ -1140,15 +1142,17 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
}
// Read the actual key into a direct buffer.
- int keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
+ int keyLength = it.key(currentKeyBuffer.limit(MAX_KEY_SIZE));
boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
- directBuffer.limit(ROW_PREFIX_SIZE);
+ currentKeyBuffer.limit(ROW_PREFIX_SIZE);
+
+ RowId rowId = getRowId(currentKeyBuffer);
// Copy actual row id into a "seekKeyBuf" buffer.
- seekKeyBuf.putLong(ROW_ID_OFFSET, directBuffer.getLong(ROW_ID_OFFSET));
- seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, directBuffer.getLong(ROW_ID_OFFSET + Long.BYTES));
+ seekKeyBuf.putLong(ROW_ID_OFFSET, normalize(rowId.mostSignificantBits()));
+ seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, normalize(rowId.leastSignificantBits()));
// This one might look tricky. We finished processing next row. There are three options:
// - "found" flag is false - there's no fitting version of the row. We'll continue to next iteration;
@@ -1168,12 +1172,37 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
// Cache row and return "true" if it's found and not a tombstone.
byte[] valueBytes = it.value();
- directBuffer.limit(keyLength);
- ReadResult readResult = readResultFromKeyAndValue(isWriteIntent, directBuffer, valueBytes);
+ HybridTimestamp nextCommitTimestamp = null;
+
+ if (isWriteIntent) {
+ it.next();
+
+ if (!invalid(it)) {
+ ByteBuffer key = ByteBuffer.wrap(it.key()).order(KEY_BYTE_ORDER);
+
+ if (matches(rowId, key)) {
+ // This is a next version of current row.
+ nextCommitTimestamp = readTimestamp(key);
+ }
+ }
+ }
+
+ currentKeyBuffer.limit(keyLength);
+
+ assert valueBytes != null;
+
+ ReadResult readResult;
+
+ if (!isWriteIntent) {
+ // There is no write-intent, return latest committed row.
+ readResult = wrapCommittedValue(valueBytes, readTimestamp(currentKeyBuffer));
+ } else {
+ readResult = wrapUncommittedValue(valueBytes, nextCommitTimestamp);
+ }
- if (!readResult.isEmpty()) {
+ if (!readResult.isEmpty() || readResult.isWriteIntent()) {
next = readResult;
- currentRowId = getRowId(directBuffer);
+ currentRowId = rowId;
return true;
}
@@ -1224,7 +1253,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
ReadResult readResult = handleReadByTimestampIterator(it, rowId, timestamp, seekKeyBuf);
- if (readResult.isEmpty()) {
+ if (readResult.isEmpty() && !readResult.isWriteIntent()) {
// Seek to next row id as we found nothing that matches.
incrementRowId(seekKeyBuf);