You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/09/19 16:07:19 UTC

[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1102: IGNITE-17627 Extend MvPartitionStorage read API with write intent resolution capabilities

ibessonov commented on code in PR #1102:
URL: https://github.com/apache/ignite-3/pull/1102#discussion_r974397392


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -106,12 +106,12 @@ private static HybridTimestamp convertTimestamp(Timestamp timestamp) {
      *
      * @param rowId Row id.
      * @param txId Transaction id.
-     * @return Binary row that corresponds to the key or {@code null} if value is not found.
+     * @return Read result that corresponds to the key or {@code null} if value is not found.
      * @throws TxIdMismatchException If there's another pending update associated with different transaction id.
      * @throws StorageException If failed to read data from the storage.
      */
     @Nullable
-    BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException;
+    ReadResult read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException;

Review Comment:
   Ok, what's the point of returning enriched information in RW transaction?



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.UUID;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link MvPartitionStorage#read} result.
+ */
+public class ReadResult {
+    /** Unset commit partition id value. */
+    public static final int UNDEFINED_COMMIT_PARTITION_ID = -1;
+
+    /** Data. */
+    private final BinaryRow binaryRow;
+
+    /** Transaction id. Not {@code null} iff this is a write-intent. */
+    private final @Nullable UUID transactionId;
+
+    /** Commit table id. Not {@code null} iff this is a write-intent. */
+    private final @Nullable UUID commitTableId;
+
+    /** Commit table id. If this is not a write-intent it is equal to {@link #UNDEFINED_COMMIT_PARTITION_ID}. */
+    private final int commitPartitionId;
+
+    /**
+     * Timestamp of the newest commit of the data. Not {@code null} iff committed version exists, this is a
+     * write-intent and read was made with a timestamp.
+     */
+    private final @Nullable HybridTimestamp newestCommitTs;
+
+    private ReadResult(BinaryRow binaryRow, @Nullable UUID transactionId, @Nullable UUID commitTableId,
+            @Nullable HybridTimestamp newestCommitTs, int commitPartitionId) {
+        this.binaryRow = binaryRow;
+
+        // If transaction is not null, then commitTableId and commitPartitionId should be defined.
+        assert (transactionId == null) || (commitTableId != null && commitPartitionId != -1);
+
+        // If transaction id is null, then commitTableId and commitPartitionId should not be defined.
+        assert (transactionId != null) || (commitTableId == null && commitPartitionId == -1);
+
+        this.transactionId = transactionId;
+        this.commitTableId = commitTableId;
+        this.newestCommitTs = newestCommitTs;
+        this.commitPartitionId = commitPartitionId;
+    }
+
+    public static ReadResult createFromWriteIntent(BinaryRow binaryRow, UUID transactionId, UUID commitTableId,
+            @Nullable HybridTimestamp lastCommittedTimestamp, int commitPartitionId) {
+        return new ReadResult(binaryRow, transactionId, commitTableId, lastCommittedTimestamp, commitPartitionId);
+    }
+
+    public static ReadResult createFromCommitted(BinaryRow binaryRow) {
+        return new ReadResult(binaryRow, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
+    }
+
+    /**
+     * Returns binary row representation of the data.
+     *
+     * @return Binary row representation of the data.
+     */
+    public BinaryRow binaryRow() {
+        return binaryRow;
+    }
+
+    /**
+     * Returns transaction id part of the transaction state if this is a write-intent,
+     * {@code null} otherwise.
+     *
+     * @return Transaction id part of the transaction state if this is a write-intent,
+     *         {@code null} otherwise.
+     */
+    public @Nullable UUID transactionId() {
+        return transactionId;
+    }
+
+    /**
+     * Returns commit table id part of the transaction state if this is a write-intent,
+     * {@code null} otherwise.
+     *
+     * @return Commit table id part of the transaction state if this is a write-intent,
+     *         {@code null} otherwise.
+     */
+    public @Nullable UUID commitTableId() {
+        return commitTableId;
+    }
+
+    /**
+     * Returns timestamp of the most recent commit of the row.
+     *
+     * @return Timestamp of the most recent commit of the row.
+     */
+    public @Nullable HybridTimestamp newestCommitTs() {

Review Comment:
   I guess methods could use full words



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java:
##########
@@ -678,9 +712,52 @@ void scanWithTxIdThrowsWhenOtherTransactionHasUncommittedChanges() {
     void readByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() {
         RowId rowId = commitAbortAndAddUncommitted();
 
-        BinaryRow foundRow = storage.read(rowId, clock.now());
+        BinaryRow foundRow = storage.read(rowId, clock.now()).binaryRow();
 
-        assertRowMatches(foundRow, binaryRow);
+        // We see the uncommitted row.
+        assertRowMatches(foundRow, binaryRow3);
+    }
+
+    @Test
+    void readByTimestampBeforeAndAfterUncommittedWrite() {
+        RowId rowId = new RowId(PARTITION_ID);
+
+        HybridTimestamp commitTs = clock.now();
+
+        storage.runConsistently(() -> {
+            storage.addWrite(rowId, binaryRow, UUID.randomUUID(), UUID.randomUUID(), 5);
+
+            storage.commitWrite(rowId, commitTs);
+            return null;

Review Comment:
   ```suggestion
               storage.commitWrite(rowId, commitTs);
   
               return null;
   ```



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java:
##########
@@ -213,27 +223,85 @@ private static BinaryRow read(
                 throw new TxIdMismatchException(txId, versionChain.txId);
             }
 
-            return binaryRow;
+            boolean isWriteIntent = versionChain.ts == null;
+
+            if (isWriteIntent) {

Review Comment:
   I believe you can replace a weird `notWriteIntent` method with a good `isWriteIntent` method and just use it here. Was I the author of that method with negation in its name? Jesus!



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -280,38 +305,65 @@ private void throwIfChainBelongsToAnotherTx(VersionChain versionChain, UUID txId
         assert transactionId != null ^ timestamp != null;
 
         if (transactionId != null) {
-            return findLatestRowVersion(versionChain, transactionId, keyFilter);
+            ReadResult res = findLatestRowVersion(versionChain, transactionId, keyFilter);
+
+            if (res == null) {
+                return null;
+            }
+
+            return res.binaryRow();
         } else {
-            ByteBufferRow row = findRowVersionByTimestamp(versionChain, timestamp);
+            ReadResult res = findRowVersionByTimestamp(versionChain, timestamp);
+
+            if (res == null) {
+                return null;
+            }
+
+            BinaryRow row = res.binaryRow();
 
             return keyFilter.test(row) ? row : null;
         }
     }
 
-    private @Nullable ByteBufferRow findRowVersionByTimestamp(VersionChain versionChain, HybridTimestamp timestamp) {
-        if (!versionChain.hasCommittedVersions()) {
-            return null;
-        }
-
-        long newestCommittedLink = versionChain.newestCommittedLink();
+    private @Nullable ReadResult findRowVersionByTimestamp(VersionChain versionChain, HybridTimestamp timestamp) {
+        long headLink = versionChain.headLink();
 
         ScanVersionChainByTimestamp scanByTimestamp = new ScanVersionChainByTimestamp(partitionId);
 
         try {
-            rowVersionDataPageReader.traverse(newestCommittedLink, scanByTimestamp, timestamp);
+            rowVersionDataPageReader.traverse(headLink, scanByTimestamp, timestamp);

Review Comment:
   Maybe we should rename `scanByTimestamp`



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -240,9 +249,23 @@ private PageMemoryHashIndexStorage createOrRestoreHashIndex(IndexMeta indexMeta)
             return null;
         }
 
-        throwIfChainBelongsToAnotherTx(versionChain, txId);
+        if (versionChain.isUncommitted()) {
+            UUID chainTxId = versionChain.transactionId();
+
+            assert chainTxId != null;
+
+            throwIfChainBelongsToAnotherTx(versionChain, txId);
+
+            return ReadResult.createFromWriteIntent(
+                    row,
+                    versionChain.transactionId(),
+                    versionChain.commitTableId(),
+                    null,

Review Comment:
   It looks weird that there's a gap between table id and partition id, I'd love to see them side by side



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -142,28 +151,30 @@ default BinaryRow read(RowId rowId, Timestamp timestamp) throws StorageException
      * @return Row id.
      * @throws StorageException If failed to write data into the storage.
      *
-     * @deprecated Generates different ids for each replica. {@link #addWrite(RowId, BinaryRow, UUID)} with explicit replicated id must be
-     *      used instead.
+     * @deprecated Generates different ids for each replica. {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} with explicit replicated
+     *      id must be used instead.
      */
     @Deprecated
     RowId insert(BinaryRow binaryRow, UUID txId) throws StorageException;
 
     /**
-     * Creates (or replaces) an uncommitted (aka pending) version, assigned to the given transaction id.
-     * In details:
-     * - if there is no uncommitted version, a new uncommitted version is added
-     * - if there is an uncommitted version belonging to the same transaction, it gets replaced by the given version
-     * - if there is an uncommitted version belonging to a different transaction, {@link TxIdMismatchException} is thrown
+     * Creates (or replaces) an uncommitted (aka pending) version, assigned to the given transaction id. In details: - if there is no

Review Comment:
   You broke formatting. Please fix it



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java:
##########
@@ -165,14 +185,14 @@ public void testAddWrite() {
         // Write from the same transaction.
         addWrite(rowId, binaryRow, txId);
 
-        // Read without timestamp returns uncommited row.
+        // Read without timestamp returns uncommitted row.
         assertRowMatches(read(rowId, txId), binaryRow);
 
         // Read with wrong transaction id should throw exception.
         assertThrows(TxIdMismatchException.class, () -> read(rowId, newTransactionId()));
 
         // Read with timestamp returns null.

Review Comment:
   Comment doesn't match the code anymore



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java:
##########
@@ -746,4 +822,184 @@ void testAppliedIndex() {
 
         assertEquals(1, storage.persistedIndex());
     }
+
+    @Test
+    void testReadWithinBeforeAndAfterTwoCommits() {
+        HybridTimestamp before = clock.now();
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        UUID txId = UUID.randomUUID();
+        UUID commitTableId = UUID.randomUUID();
+        int commitPartitionId = 1337;
+
+        HybridTimestamp first = clock.now();
+
+        storage.runConsistently(() -> {
+            storage.addWrite(rowId, binaryRow, txId, commitTableId, commitPartitionId);
+
+            storage.commitWrite(rowId, first);
+            return null;
+        });
+
+        HybridTimestamp betweenCommits = clock.now();
+
+        HybridTimestamp second = clock.now();
+
+        storage.runConsistently(() -> {
+            storage.addWrite(rowId, binaryRow2, UUID.randomUUID(), UUID.randomUUID(), commitPartitionId + 1);
+
+            storage.commitWrite(rowId, second);
+            return null;
+        });
+
+        storage.runConsistently(() -> {
+            storage.addWrite(rowId, binaryRow3, UUID.randomUUID(), UUID.randomUUID(), commitPartitionId + 2);
+
+            return null;
+        });
+
+        HybridTimestamp after = clock.now();
+
+        // Read before commits.
+        ReadResult res = storage.read(rowId, before);
+        assertNull(res);
+
+        // Read at exact time of first commit.
+        res = storage.read(rowId, first);
+
+        assertNotNull(res);
+        assertNull(res.newestCommitTs());
+        assertRowMatches(res.binaryRow(), binaryRow);
+
+        // Read between two commits.
+        res = storage.read(rowId, betweenCommits);
+
+        assertNotNull(res);
+        assertNull(res.newestCommitTs());
+        assertRowMatches(res.binaryRow(), binaryRow);
+
+        // Read at exact time of second commit.
+        res = storage.read(rowId, second);
+
+        assertNotNull(res);
+        assertNull(res.newestCommitTs());
+        assertRowMatches(res.binaryRow(), binaryRow2);
+
+        // Read after second commit (write intent).
+        res = storage.read(rowId, after);
+
+        assertNotNull(res);
+        assertNotNull(res.newestCommitTs());
+        assertEquals(second, res.newestCommitTs());
+        assertRowMatches(res.binaryRow(), binaryRow3);
+    }
+
+    @Test
+    void testWrongPartition() {
+        RowId rowId = commitAbortAndAddUncommitted();
+
+        var row = new RowId(rowId.partitionId() + 1, rowId.mostSignificantBits(), rowId.leastSignificantBits());
+
+        assertNull(read(row, clock.now()));
+    }
+
+    @Test
+    void testReadingNothingWithLowerRowIdIfHigherRowIdWritesExist() {
+        RowId rowId = commitAbortAndAddUncommitted();
+
+        RowId lowerRowId = getPreviousRowId(rowId);
+
+        assertNull(read(lowerRowId, clock.now()));
+    }
+
+    @Test
+    void testReadingNothingByTxIdWithLowerRowId() {
+        RowId higherRowId = new RowId(PARTITION_ID);
+        RowId lowerRowId = getPreviousRowId(higherRowId);
+
+        UUID txId = UUID.randomUUID();
+
+        storage.runConsistently(() -> {
+            UUID tableId = UUID.randomUUID();
+
+            storage.addWrite(higherRowId, binaryRow, txId, tableId, PARTITION_ID);
+
+            return null;
+        });
+
+        assertNull(read(lowerRowId, txId));
+    }
+
+    @Test
+    void testReadingCorrectWriteIntentByTimestampIfLowerRowIdWriteIntentExists() {
+        RowId higherRowId = new RowId(PARTITION_ID);
+        RowId lowerRowId = getPreviousRowId(higherRowId);
+
+        UUID txId = UUID.randomUUID();
+
+        storage.runConsistently(() -> {
+            UUID tableId = UUID.randomUUID();
+
+            storage.addWrite(lowerRowId, binaryRow2, UUID.randomUUID(), UUID.randomUUID(), PARTITION_ID);
+            storage.addWrite(higherRowId, binaryRow, txId, tableId, PARTITION_ID);
+
+            storage.commitWrite(higherRowId, clock.now());
+
+            return null;
+        });
+
+        assertRowMatches(read(higherRowId, clock.now()), binaryRow);
+    }
+
+    @Test
+    void testReadingCorrectWriteIntentByTimestampIfHigherRowIdWriteIntentExists() {
+        RowId higherRowId = new RowId(PARTITION_ID);
+        RowId lowerRowId = getPreviousRowId(higherRowId);
+
+        storage.runConsistently(() -> {
+            UUID txId = UUID.randomUUID();
+            UUID tableId = UUID.randomUUID();
+
+            storage.addWrite(lowerRowId, binaryRow, txId, tableId, PARTITION_ID);
+            storage.addWrite(higherRowId, binaryRow2, txId, tableId, PARTITION_ID);
+
+            return null;
+        });
+
+        assertRowMatches(read(lowerRowId, clock.now()), binaryRow);
+    }
+
+    /**
+     * Returns row id that is lexicographically smaller (by the value of one) than the argument.
+     *
+     * @param value Row id.
+     * @return Row id value minus 1.
+     */
+    private RowId getPreviousRowId(RowId value) {
+        Pair<Long, Long> previous128Uint = getPrevious128Uint(value.mostSignificantBits(), value.leastSignificantBits());
+
+        return new RowId(value.partitionId(), previous128Uint.getFirst(), previous128Uint.getSecond());
+    }
+
+    /**
+     * Performs a decrement operation on a 128-bit unsigned value that is represented by two longs.
+     *
+     * @param msb Most significant bytes of 128-bit unsigned integer.
+     * @param lsb Least significant bytes of 128-bit unsigned integer.
+     * @return Less by one value.
+     */
+    private Pair<Long, Long> getPrevious128Uint(long msb, long lsb) {
+        BigInteger unsignedMsb = BigInteger.valueOf(msb).and(UNSIGNED_LONG_MASK);
+        BigInteger unsignedLsb = BigInteger.valueOf(lsb).and(UNSIGNED_LONG_MASK);
+
+        BigInteger biRepresentation = unsignedMsb.shiftLeft(Long.SIZE).add(unsignedLsb);
+
+        BigInteger lesserBi = biRepresentation.subtract(BigInteger.ONE);
+
+        long newMsb = lesserBi.shiftRight(Long.SIZE).and(UNSIGNED_LONG_MASK).longValue();
+        long newLsb = lesserBi.and(UNSIGNED_LONG_MASK).longValue();
+
+        return new Pair<>(newMsb, newLsb);

Review Comment:
   ```
   assert (msb | lsb) != 0L;
   
   if (lsb-- == 0L) {
       --msb;
   }
   
   return new Pair<>(msb, lsb);
   ```



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java:
##########
@@ -213,27 +223,85 @@ private static BinaryRow read(
                 throw new TxIdMismatchException(txId, versionChain.txId);
             }
 
-            return binaryRow;
+            boolean isWriteIntent = versionChain.ts == null;
+
+            if (isWriteIntent) {
+                return ReadResult.createFromWriteIntent(
+                        binaryRow,
+                        versionChain.txId,
+                        versionChain.commitTableId,
+                        versionChain.next != null ? versionChain.next.ts : null,
+                        versionChain.commitPartitionId
+                );
+            }
+
+            return ReadResult.createFromCommitted(binaryRow);
         }
 
         VersionChain cur = versionChain;
 
-        if (cur.begin == null) {
+        boolean hasWriteIntent = false;
+
+        if (cur.ts == null) {
+            hasWriteIntent = true;
+
+            if (cur.next == null) {
+                // We only have a write-intent.
+                BinaryRow binaryRow = cur.row;
+
+                if (filter != null && !filter.test(binaryRow)) {
+                    return null;
+                }
+
+                return ReadResult.createFromWriteIntent(binaryRow, cur.txId, cur.commitTableId, null,
+                        cur.commitPartitionId);
+            }
+
             cur = cur.next;
         }
 
+        return walkVersionChain(versionChain, timestamp, filter, cur, hasWriteIntent);
+    }
+
+    @Nullable
+    private static ReadResult walkVersionChain(VersionChain chainHead, @NotNull HybridTimestamp timestamp,

Review Comment:
   We don't use NotNull



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java:
##########
@@ -678,9 +712,52 @@ void scanWithTxIdThrowsWhenOtherTransactionHasUncommittedChanges() {
     void readByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() {
         RowId rowId = commitAbortAndAddUncommitted();
 
-        BinaryRow foundRow = storage.read(rowId, clock.now());
+        BinaryRow foundRow = storage.read(rowId, clock.now()).binaryRow();
 
-        assertRowMatches(foundRow, binaryRow);
+        // We see the uncommitted row.
+        assertRowMatches(foundRow, binaryRow3);
+    }
+
+    @Test
+    void readByTimestampBeforeAndAfterUncommittedWrite() {
+        RowId rowId = new RowId(PARTITION_ID);
+
+        HybridTimestamp commitTs = clock.now();
+
+        storage.runConsistently(() -> {
+            storage.addWrite(rowId, binaryRow, UUID.randomUUID(), UUID.randomUUID(), 5);
+
+            storage.commitWrite(rowId, commitTs);
+            return null;
+        });
+
+        UUID txId2 = UUID.randomUUID();

Review Comment:
   You know, at this point I would like to have diagrams that explain the state of the data. Long tests are kinda hard to follow when they do some complicated stuff.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -213,7 +218,11 @@ private PageMemoryHashIndexStorage createOrRestoreHashIndex(IndexMeta indexMeta)
 
     /** {@inheritDoc} */
     @Override
-    public @Nullable BinaryRow read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
+    public @Nullable ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {

Review Comment:
   btw, is it ok to return nulls now? I mean, what if you have a tombstone? There would be two ways to return empty value then, not cool. `ReadResult` itself should never be null.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java:
##########
@@ -213,27 +223,85 @@ private static BinaryRow read(
                 throw new TxIdMismatchException(txId, versionChain.txId);
             }
 
-            return binaryRow;
+            boolean isWriteIntent = versionChain.ts == null;
+
+            if (isWriteIntent) {
+                return ReadResult.createFromWriteIntent(
+                        binaryRow,
+                        versionChain.txId,
+                        versionChain.commitTableId,
+                        versionChain.next != null ? versionChain.next.ts : null,
+                        versionChain.commitPartitionId
+                );
+            }
+
+            return ReadResult.createFromCommitted(binaryRow);
         }
 
         VersionChain cur = versionChain;
 
-        if (cur.begin == null) {
+        boolean hasWriteIntent = false;
+
+        if (cur.ts == null) {
+            hasWriteIntent = true;
+
+            if (cur.next == null) {
+                // We only have a write-intent.
+                BinaryRow binaryRow = cur.row;
+
+                if (filter != null && !filter.test(binaryRow)) {
+                    return null;
+                }
+
+                return ReadResult.createFromWriteIntent(binaryRow, cur.txId, cur.commitTableId, null,
+                        cur.commitPartitionId);
+            }
+
             cur = cur.next;
         }
 
+        return walkVersionChain(versionChain, timestamp, filter, cur, hasWriteIntent);
+    }
+
+    @Nullable
+    private static ReadResult walkVersionChain(VersionChain chainHead, @NotNull HybridTimestamp timestamp,
+            @Nullable Predicate<BinaryRow> filter, VersionChain firstCommit, boolean hasWriteIntent) {
+        boolean isLatestCommit = true;
+
+        VersionChain cur = firstCommit;
+
         while (cur != null) {
-            if (timestamp.compareTo(cur.begin) >= 0) {
+            int compareResult = timestamp.compareTo(cur.ts);

Review Comment:
   Ok, I see that this code is rather complicated. Maybe we can make it simpler.
   If you check `timestamp.compareTo(firstCommit.ts)` _before the loop_ and it's positive, you return a write intent straight away (if it exists).
   Otherwise, the old loop will pretty much do it



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java:
##########
@@ -213,27 +223,85 @@ private static BinaryRow read(
                 throw new TxIdMismatchException(txId, versionChain.txId);
             }
 
-            return binaryRow;
+            boolean isWriteIntent = versionChain.ts == null;
+
+            if (isWriteIntent) {
+                return ReadResult.createFromWriteIntent(
+                        binaryRow,
+                        versionChain.txId,
+                        versionChain.commitTableId,
+                        versionChain.next != null ? versionChain.next.ts : null,
+                        versionChain.commitPartitionId
+                );
+            }
+
+            return ReadResult.createFromCommitted(binaryRow);
         }
 
         VersionChain cur = versionChain;
 
-        if (cur.begin == null) {
+        boolean hasWriteIntent = false;

Review Comment:
   Is this variable necessary? It's calculable from `versionChain` that you pass into `walk*`



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java:
##########
@@ -105,7 +125,7 @@ protected RowId insert(BinaryRow binaryRow, UUID txId) {
      * Adds/updates a write-intent inside of consistency closure.
      */
     protected BinaryRow addWrite(RowId rowId, BinaryRow binaryRow, UUID txId) {
-        return storage.runConsistently(() -> storage.addWrite(rowId, binaryRow, txId));
+        return storage.runConsistently(() -> storage.addWrite(rowId, binaryRow, txId, UUID.randomUUID(), 0));

Review Comment:
   You could create a constant for a table id



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org