You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/10/26 09:38:00 UTC

[ignite-3] 01/01: IGNITE-17968 Fix write-intents being filtered out in case if it's a tombstone

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-17968
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 64f82fe38372889419c2cf99b054df30fc54da8b
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Oct 26 13:37:49 2022 +0400

    IGNITE-17968 Fix write-intents being filtered out in case if it's a tombstone
---
 .../apache/ignite/internal/storage/ReadResult.java |  3 +-
 .../storage/AbstractMvPartitionStorageTest.java    | 18 ++++++++-
 .../storage/impl/TestMvPartitionStorage.java       |  2 +-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 13 ++++--
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 46 +++++++++++++++++-----
 5 files changed, 65 insertions(+), 17 deletions(-)

diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
index cca33ed69f..90f01bb21f 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -130,7 +130,8 @@ public class ReadResult {
     }
 
     /**
-     * Returns timestamp of the most recent commit of the row. Might be {@code null} for {@link MvPartitionStorage#scanVersions(RowId)}
+     * Returns timestamp of the most recent commit of the row. Not {@code null} if committed version exists, this is a
+     * write-intent and read was made with a timestamp. Might be {@code null} for {@link MvPartitionStorage#scanVersions(RowId)}
      * even for write intents having a preceding committed version.
      *
      * @return Timestamp of the most recent commit of the row.
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 76c6db0b00..7576f92cef 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -1269,8 +1269,22 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
     @ParameterizedTest
     @EnumSource(ScanTimestampProvider.class)
-    public void scanDoesNotSeeTombstonesWhenTombstoneIsNotCommitted(ScanTimestampProvider tsProvider) throws Exception {
-        testScanDoesNotSeeTombstones(tsProvider, false);
+    public void scanSeesTombstonesWhenTombstoneIsNotCommitted(ScanTimestampProvider tsProvider) throws Exception {
+        RowId rowId = insert(binaryRow, txId);
+        HybridTimestamp commitTs = clock.now();
+        commitWrite(rowId, commitTs);
+
+        addWrite(rowId, null, newTransactionId());
+
+        try (PartitionTimestampCursor cursor = scan(tsProvider.scanTimestamp(clock))) {
+            assertTrue(cursor.hasNext());
+
+            ReadResult next = cursor.next();
+            assertNull(next.binaryRow());
+            assertEquals(commitTs, next.newestCommitTimestamp());
+
+            assertFalse(cursor.hasNext());
+        }
     }
 
     @ParameterizedTest
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index f6043ae9a8..50e27bd297 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -351,7 +351,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
                     VersionChain chain = iterator.next();
                     ReadResult readResult = read(chain, timestamp, null);
 
-                    if (!readResult.isEmpty()) {
+                    if (!readResult.isEmpty() || readResult.isWriteIntent()) {
                         currentChain = chain;
                         currentReadResult = readResult;
 
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 3297c8cbc6..d05f62ed46 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -296,7 +296,14 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         if (versionChain.isUncommitted()) {
             assert versionChain.transactionId() != null;
 
-            return writeIntentToResult(versionChain, rowVersion, null);
+            HybridTimestamp newestCommitTs = null;
+
+            if (versionChain.hasCommittedVersions()) {
+                long newestCommitLink = versionChain.newestCommittedLink();
+                newestCommitTs = readRowVersion(newestCommitLink, ALWAYS_LOAD_VALUE).timestamp();
+            }
+
+            return writeIntentToResult(versionChain, rowVersion, newestCommitTs);
         } else {
             ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
 
@@ -798,7 +805,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 VersionChain chain = treeCursor.next();
                 ReadResult result = findRowVersionByTimestamp(chain, timestamp);
 
-                if (result.isEmpty()) {
+                if (result.isEmpty() && !result.isWriteIntent()) {
                     continue;
                 }
 
@@ -841,7 +848,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 VersionChain chain = treeCursor.next();
                 ReadResult result = findLatestRowVersion(chain);
 
-                if (result.isEmpty()) {
+                if (result.isEmpty() && !result.isWriteIntent()) {
                     continue;
                 }
 
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 1a598edd3b..ea5524377f 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -1115,7 +1115,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
             currentRowId = null;
 
             // Prepare direct buffer slice to read keys from the iterator.
-            ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
+            ByteBuffer currentKeyBuffer = MV_KEY_BUFFER.get().position(0);
 
             while (true) {
                 // At this point, seekKeyBuf should contain row id that's above the one we already scanned, but not greater than any
@@ -1140,15 +1140,15 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
                 }
 
                 // Read the actual key into a direct buffer.
-                int keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
+                int keyLength = it.key(currentKeyBuffer.limit(MAX_KEY_SIZE));
 
                 boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
-                directBuffer.limit(ROW_PREFIX_SIZE);
+                currentKeyBuffer.limit(ROW_PREFIX_SIZE);
 
                 // Copy actual row id into a "seekKeyBuf" buffer.
-                seekKeyBuf.putLong(ROW_ID_OFFSET, directBuffer.getLong(ROW_ID_OFFSET));
-                seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, directBuffer.getLong(ROW_ID_OFFSET + Long.BYTES));
+                seekKeyBuf.putLong(ROW_ID_OFFSET, currentKeyBuffer.getLong(ROW_ID_OFFSET));
+                seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, currentKeyBuffer.getLong(ROW_ID_OFFSET + Long.BYTES));
 
                 // This one might look tricky. We finished processing next row. There are three options:
                 //  - "found" flag is false - there's no fitting version of the row. We'll continue to next iteration;
@@ -1168,12 +1168,38 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
                 // Cache row and return "true" if it's found and not a tombstone.
                 byte[] valueBytes = it.value();
 
-                directBuffer.limit(keyLength);
-                ReadResult readResult = readResultFromKeyAndValue(isWriteIntent, directBuffer, valueBytes);
+                RowId rowId = getRowId(currentKeyBuffer);
+                HybridTimestamp nextCommitTimestamp = null;
+
+                if (isWriteIntent) {
+                    it.next();
+
+                    if (!invalid(it)) {
+                        ByteBuffer key = ByteBuffer.wrap(it.key()).order(KEY_BYTE_ORDER);
+
+                        if (matches(rowId, key)) {
+                            // This is a next version of current row.
+                            nextCommitTimestamp = readTimestamp(key);
+                        }
+                    }
+                }
+
+                currentKeyBuffer.limit(keyLength);
+
+                assert valueBytes != null;
+
+                ReadResult readResult;
+
+                if (!isWriteIntent) {
+                    // There is no write-intent, return latest committed row.
+                    readResult = wrapCommittedValue(valueBytes, readTimestamp(currentKeyBuffer));
+                } else {
+                    readResult = wrapUncommittedValue(valueBytes, nextCommitTimestamp);
+                }
 
-                if (!readResult.isEmpty()) {
+                if (!readResult.isEmpty() || readResult.isWriteIntent()) {
                     next = readResult;
-                    currentRowId = getRowId(directBuffer);
+                    currentRowId = rowId;
 
                     return true;
                 }
@@ -1224,7 +1250,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
                 ReadResult readResult = handleReadByTimestampIterator(it, rowId, timestamp, seekKeyBuf);
 
-                if (readResult.isEmpty()) {
+                if (readResult.isEmpty() && !readResult.isWriteIntent()) {
                     // Seek to next row id as we found nothing that matches.
                     incrementRowId(seekKeyBuf);