You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/10/26 09:37:59 UTC

[ignite-3] branch ignite-17968 created (now 64f82fe383)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a change to branch ignite-17968
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


      at 64f82fe383 IGNITE-17968 Fix write-intents being filtered out in case if it's a tombstone

This branch includes the following new commits:

     new 64f82fe383 IGNITE-17968 Fix write-intents being filtered out in case if it's a tombstone

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite-3] 01/01: IGNITE-17968 Fix write-intents being filtered out in case if it's a tombstone

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-17968
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 64f82fe38372889419c2cf99b054df30fc54da8b
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Oct 26 13:37:49 2022 +0400

    IGNITE-17968 Fix write-intents being filtered out in case if it's a tombstone
---
 .../apache/ignite/internal/storage/ReadResult.java |  3 +-
 .../storage/AbstractMvPartitionStorageTest.java    | 18 ++++++++-
 .../storage/impl/TestMvPartitionStorage.java       |  2 +-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 13 ++++--
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 46 +++++++++++++++++-----
 5 files changed, 65 insertions(+), 17 deletions(-)

diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
index cca33ed69f..90f01bb21f 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -130,7 +130,8 @@ public class ReadResult {
     }
 
     /**
-     * Returns timestamp of the most recent commit of the row. Might be {@code null} for {@link MvPartitionStorage#scanVersions(RowId)}
+     * Returns timestamp of the most recent commit of the row. Not {@code null} if committed version exists, this is a
+     * write-intent and read was made with a timestamp. Might be {@code null} for {@link MvPartitionStorage#scanVersions(RowId)}
      * even for write intents having a preceding committed version.
      *
      * @return Timestamp of the most recent commit of the row.
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 76c6db0b00..7576f92cef 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -1269,8 +1269,22 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
     @ParameterizedTest
     @EnumSource(ScanTimestampProvider.class)
-    public void scanDoesNotSeeTombstonesWhenTombstoneIsNotCommitted(ScanTimestampProvider tsProvider) throws Exception {
-        testScanDoesNotSeeTombstones(tsProvider, false);
+    public void scanSeesTombstonesWhenTombstoneIsNotCommitted(ScanTimestampProvider tsProvider) throws Exception {
+        RowId rowId = insert(binaryRow, txId);
+        HybridTimestamp commitTs = clock.now();
+        commitWrite(rowId, commitTs);
+
+        addWrite(rowId, null, newTransactionId());
+
+        try (PartitionTimestampCursor cursor = scan(tsProvider.scanTimestamp(clock))) {
+            assertTrue(cursor.hasNext());
+
+            ReadResult next = cursor.next();
+            assertNull(next.binaryRow());
+            assertEquals(commitTs, next.newestCommitTimestamp());
+
+            assertFalse(cursor.hasNext());
+        }
     }
 
     @ParameterizedTest
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index f6043ae9a8..50e27bd297 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -351,7 +351,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
                     VersionChain chain = iterator.next();
                     ReadResult readResult = read(chain, timestamp, null);
 
-                    if (!readResult.isEmpty()) {
+                    if (!readResult.isEmpty() || readResult.isWriteIntent()) {
                         currentChain = chain;
                         currentReadResult = readResult;
 
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 3297c8cbc6..d05f62ed46 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -296,7 +296,14 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         if (versionChain.isUncommitted()) {
             assert versionChain.transactionId() != null;
 
-            return writeIntentToResult(versionChain, rowVersion, null);
+            HybridTimestamp newestCommitTs = null;
+
+            if (versionChain.hasCommittedVersions()) {
+                long newestCommitLink = versionChain.newestCommittedLink();
+                newestCommitTs = readRowVersion(newestCommitLink, ALWAYS_LOAD_VALUE).timestamp();
+            }
+
+            return writeIntentToResult(versionChain, rowVersion, newestCommitTs);
         } else {
             ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
 
@@ -798,7 +805,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 VersionChain chain = treeCursor.next();
                 ReadResult result = findRowVersionByTimestamp(chain, timestamp);
 
-                if (result.isEmpty()) {
+                if (result.isEmpty() && !result.isWriteIntent()) {
                     continue;
                 }
 
@@ -841,7 +848,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 VersionChain chain = treeCursor.next();
                 ReadResult result = findLatestRowVersion(chain);
 
-                if (result.isEmpty()) {
+                if (result.isEmpty() && !result.isWriteIntent()) {
                     continue;
                 }
 
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 1a598edd3b..ea5524377f 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -1115,7 +1115,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
             currentRowId = null;
 
             // Prepare direct buffer slice to read keys from the iterator.
-            ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
+            ByteBuffer currentKeyBuffer = MV_KEY_BUFFER.get().position(0);
 
             while (true) {
                 // At this point, seekKeyBuf should contain row id that's above the one we already scanned, but not greater than any
@@ -1140,15 +1140,15 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
                 }
 
                 // Read the actual key into a direct buffer.
-                int keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
+                int keyLength = it.key(currentKeyBuffer.limit(MAX_KEY_SIZE));
 
                 boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
-                directBuffer.limit(ROW_PREFIX_SIZE);
+                currentKeyBuffer.limit(ROW_PREFIX_SIZE);
 
                 // Copy actual row id into a "seekKeyBuf" buffer.
-                seekKeyBuf.putLong(ROW_ID_OFFSET, directBuffer.getLong(ROW_ID_OFFSET));
-                seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, directBuffer.getLong(ROW_ID_OFFSET + Long.BYTES));
+                seekKeyBuf.putLong(ROW_ID_OFFSET, currentKeyBuffer.getLong(ROW_ID_OFFSET));
+                seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, currentKeyBuffer.getLong(ROW_ID_OFFSET + Long.BYTES));
 
                 // This one might look tricky. We finished processing next row. There are three options:
                 //  - "found" flag is false - there's no fitting version of the row. We'll continue to next iteration;
@@ -1168,12 +1168,38 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
                 // Cache row and return "true" if it's found and not a tombstone.
                 byte[] valueBytes = it.value();
 
-                directBuffer.limit(keyLength);
-                ReadResult readResult = readResultFromKeyAndValue(isWriteIntent, directBuffer, valueBytes);
+                RowId rowId = getRowId(currentKeyBuffer);
+                HybridTimestamp nextCommitTimestamp = null;
+
+                if (isWriteIntent) {
+                    it.next();
+
+                    if (!invalid(it)) {
+                        ByteBuffer key = ByteBuffer.wrap(it.key()).order(KEY_BYTE_ORDER);
+
+                        if (matches(rowId, key)) {
+                            // This is a next version of current row.
+                            nextCommitTimestamp = readTimestamp(key);
+                        }
+                    }
+                }
+
+                currentKeyBuffer.limit(keyLength);
+
+                assert valueBytes != null;
+
+                ReadResult readResult;
+
+                if (!isWriteIntent) {
+                    // There is no write-intent, return latest committed row.
+                    readResult = wrapCommittedValue(valueBytes, readTimestamp(currentKeyBuffer));
+                } else {
+                    readResult = wrapUncommittedValue(valueBytes, nextCommitTimestamp);
+                }
 
-                if (!readResult.isEmpty()) {
+                if (!readResult.isEmpty() || readResult.isWriteIntent()) {
                     next = readResult;
-                    currentRowId = getRowId(directBuffer);
+                    currentRowId = rowId;
 
                     return true;
                 }
@@ -1224,7 +1250,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
                 ReadResult readResult = handleReadByTimestampIterator(it, rowId, timestamp, seekKeyBuf);
 
-                if (readResult.isEmpty()) {
+                if (readResult.isEmpty() && !readResult.isWriteIntent()) {
                     // Seek to next row id as we found nothing that matches.
                     incrementRowId(seekKeyBuf);