You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/09/27 13:30:13 UTC

[ignite-3] branch ignite-17720 created (now 36d43ed087)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a change to branch ignite-17720
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


      at 36d43ed087 IGNITE-17720 Extend MvPartitionStorage scan API with write intent resolution capabilities

This branch includes the following new commits:

     new 28a299ddd1 IGNITE-17720 Extend MvPartitionStorage scan API with write intent resolution capabilities
     new 36d43ed087 IGNITE-17720 Extend MvPartitionStorage scan API with write intent resolution capabilities

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite-3] 01/02: IGNITE-17720 Extend MvPartitionStorage scan API with write intent resolution capabilities

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-17720
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 28a299ddd14e6469fe815dbbee71fd84ad59316a
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Tue Sep 27 16:19:07 2022 +0400

    IGNITE-17720 Extend MvPartitionStorage scan API with write intent resolution capabilities
---
 .../internal/storage/MvPartitionStorage.java       |   4 +-
 .../internal/storage/PartitionScanCursor.java      |  35 ++++
 .../apache/ignite/internal/storage/ReadResult.java |   8 +
 .../storage/AbstractMvPartitionStorageTest.java    |  60 +++++--
 .../TestConcurrentHashMapMvPartitionStorage.java   |  67 +++++++-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 179 +++++++++++++++------
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 153 +++++++++++++++++-
 .../distributed/storage/VersionedRowStore.java     |   5 +-
 8 files changed, 438 insertions(+), 73 deletions(-)

diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index de9742bf86..4d3dc080b3 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -228,7 +228,7 @@ public interface MvPartitionStorage extends AutoCloseable {
      * @deprecated Use {@link #scan(Predicate, HybridTimestamp)}
      */
     @Deprecated
-    default Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp timestamp) throws StorageException {
+    default PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, Timestamp timestamp) throws StorageException {
         return scan(keyFilter, convertTimestamp(timestamp));
     }
 
@@ -241,7 +241,7 @@ public interface MvPartitionStorage extends AutoCloseable {
      * @throws TxIdMismatchException If there's another pending update associated with different transaction id.
      * @throws StorageException If failed to read data from the storage.
      */
-    Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException;
+    PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException;
 
     /**
      * Returns rows count belongs to current storage.
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionScanCursor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionScanCursor.java
new file mode 100644
index 0000000000..8e76c3131b
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionScanCursor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Partition cursor.
+ */
+public interface PartitionScanCursor extends Cursor<ReadResult> {
+    /**
+     * Returns a committed row within the current row id.
+     *
+     * @param timestamp Commit timestamp.
+     * @return Row or {@code null} if it doesn't exist.
+     */
+    BinaryRow committed(HybridTimestamp timestamp);
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
index 4ebd74fe3a..a1d912c6b2 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -125,4 +125,12 @@ public class ReadResult {
     public int commitPartitionId() {
         return commitPartitionId;
     }
+
+    public boolean isWriteIntent() {
+        return transactionId != null;
+    }
+
+    public boolean isEmpty() {
+        return this == EMPTY;
+    }
 }
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 21e01306c8..b88f388d56 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -48,7 +48,6 @@ import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.Pair;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -93,7 +92,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
     /**
      * Scans partition inside of consistency closure.
      */
-    protected Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
+    protected Cursor<ReadResult> scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
         return storage.runConsistently(() -> storage.scan(filter, timestamp));
     }
 
@@ -159,7 +158,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
     @Test
     public void testScanOverEmpty() throws Exception {
         assertEquals(List.of(), convert(scan(row -> true, newTransactionId())));
-        assertEquals(List.of(), convert(scan(row -> true, clock.now())));
+        assertEquals(List.of(), convert0(scan(row -> true, clock.now())));
     }
 
     /**
@@ -312,13 +311,13 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         HybridTimestamp ts5 = clock.now();
 
         // Full scan with various timestamp values.
-        assertEquals(List.of(), convert(scan(row -> true, ts1)));
+        assertEquals(List.of(), convert0(scan(row -> true, ts1)));
 
-        assertEquals(List.of(value1), convert(scan(row -> true, ts2)));
-        assertEquals(List.of(value1), convert(scan(row -> true, ts3)));
+        assertEquals(List.of(value1), convert0(scan(row -> true, ts2)));
+        assertEquals(List.of(value1), convert0(scan(row -> true, ts3)));
 
-        assertEquals(List.of(value1, value2), convert(scan(row -> true, ts4)));
-        assertEquals(List.of(value1, value2), convert(scan(row -> true, ts5)));
+        assertEquals(List.of(value1, value2), convert0(scan(row -> true, ts4)));
+        assertEquals(List.of(value1, value2), convert0(scan(row -> true, ts5)));
     }
 
     @Test
@@ -364,6 +363,15 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         }
     }
 
+    private List<TestValue> convert0(Cursor<ReadResult> cursor) throws Exception {
+        try (cursor) {
+            return cursor.stream()
+                    .map((ReadResult rs) -> BaseMvStoragesTest.value(rs.binaryRow()))
+                    .sorted(Comparator.nullsFirst(Comparator.naturalOrder()))
+                    .collect(Collectors.toList());
+        }
+    }
+
     @Test
     void readOfUncommittedRowWithCorrespondingTransactionIdReturnsTheRow() {
         RowId rowId = insert(binaryRow, txId);
@@ -765,12 +773,11 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17720")
     void scanByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() throws Exception {
         commitAbortAndAddUncommitted();
 
-        try (Cursor<BinaryRow> cursor = storage.scan(k -> true, clock.now())) {
-            BinaryRow foundRow = cursor.next();
+        try (Cursor<ReadResult> cursor = storage.scan(k -> true, clock.now())) {
+            BinaryRow foundRow = cursor.next().binaryRow();
 
             assertRowMatches(foundRow, binaryRow3);
 
@@ -1025,6 +1032,37 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         }
     }
 
+    @Test
+    void testScanWithWriteIntent() throws Exception {
+        RowId rowId1 = new RowId(PARTITION_ID);
+
+        HybridTimestamp commit1ts = clock.now();
+
+        storage.runConsistently(() -> {
+            addWrite(rowId1, binaryRow, newTransactionId());
+
+            commitWrite(rowId1, commit1ts);
+
+            addWrite(rowId1, binaryRow2, newTransactionId());
+
+            return null;
+        });
+
+        try (PartitionScanCursor cursor = storage.scan(r -> true, clock.now())) {
+            assertTrue(cursor.hasNext());
+
+            ReadResult next = cursor.next();
+
+            assertTrue(next.isWriteIntent());
+
+            assertRowMatches(next.binaryRow(), binaryRow2);
+
+            BinaryRow committedRow = cursor.committed(next.newestCommitTimestamp());
+
+            assertRowMatches(committedRow, binaryRow);
+        }
+    }
+
     @Test
     void testScanVersionsWithWriteIntent() throws Exception {
         RowId rowId = new RowId(PARTITION_ID, 100, 0);
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
index 65103d0ae9..be90f55363 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.chm;
 
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -30,6 +31,7 @@ import java.util.stream.Stream;
 import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionScanCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
@@ -327,14 +329,65 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
-        Iterator<BinaryRow> iterator = map.values().stream()
-                .map(versionChain -> read(versionChain, timestamp, null, filter))
-                .map(ReadResult::binaryRow)
-                .filter(Objects::nonNull)
-                .iterator();
+    public PartitionScanCursor scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
+        Iterator<VersionChain> iterator = map.values().iterator();
 
-        return Cursor.fromIterator(iterator);
+        return new PartitionScanCursor() {
+
+            private VersionChain currentChain;
+
+            private ReadResult currentReadResult;
+
+            @Override
+            public BinaryRow committed(HybridTimestamp timestamp) {
+                ReadResult read = read(currentChain, timestamp, null, filter);
+
+                if (read.transactionId() == null) {
+                    return read.binaryRow();
+                }
+
+                return null;
+            }
+
+            @Override
+            public void close() {
+                // No-op.
+            }
+
+            @Override
+            public boolean hasNext() {
+                if (currentReadResult != null) {
+                    return true;
+                }
+
+                while (iterator.hasNext()) {
+                    VersionChain chain = iterator.next();
+                    ReadResult readResult = read(chain, timestamp, null, filter);
+
+                    if (!readResult.isEmpty()) {
+                        currentChain = chain;
+                        currentReadResult = readResult;
+
+                        return true;
+                    }
+                }
+
+                return false;
+            }
+
+            @Override
+            public ReadResult next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+
+                ReadResult res = currentReadResult;
+
+                currentReadResult = null;
+
+                return res;
+            }
+        };
     }
 
     /** {@inheritDoc} */
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 96d4843873..9cc60288cd 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionScanCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
@@ -273,7 +274,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             return null;
         }
 
-        return findLatestRowVersion(versionChain, txId, MATCH_ALL).binaryRow();
+        return findLatestRowVersion(versionChain, txId, MATCH_ALL);
     }
 
     @Override
@@ -300,13 +301,13 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
     }
 
-    private ReadResult findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
+    private @Nullable BinaryRow findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
         RowVersion rowVersion = readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
 
         ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
 
         if (!keyFilter.test(row)) {
-            return ReadResult.EMPTY;
+            return null;
         }
 
         if (versionChain.isUncommitted()) {
@@ -316,11 +317,10 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
             throwIfChainBelongsToAnotherTx(versionChain, txId);
 
-            return ReadResult.createFromWriteIntent(row, versionChain.transactionId(), versionChain.commitTableId(), null,
-                    versionChain.commitPartitionId());
+            return row;
         }
 
-        return ReadResult.createFromCommitted(row);
+        return row;
     }
 
     private RowVersion readRowVersion(long nextLink, Predicate<HybridTimestamp> loadValue) {
@@ -351,36 +351,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         return new ByteBufferRow(rowVersion.value());
     }
 
-    private @Nullable BinaryRow findRowVersionInChain(
-            VersionChain versionChain,
-            @Nullable UUID transactionId,
-            @Nullable HybridTimestamp timestamp,
-            Predicate<BinaryRow> keyFilter
-    ) {
-        assert transactionId != null ^ timestamp != null;
-
-        if (transactionId != null) {
-            ReadResult res = findLatestRowVersion(versionChain, transactionId, keyFilter);
-
-            if (res == null) {
-                return null;
-            }
-
-            return res.binaryRow();
-        } else {
-            ReadResult res = findRowVersionByTimestamp(versionChain, timestamp);
-
-            if (res == null) {
-                return null;
-            }
-
-            BinaryRow row = res.binaryRow();
-
-            return keyFilter.test(row) ? row : null;
-        }
-    }
-
     private ReadResult findRowVersionByTimestamp(VersionChain versionChain, HybridTimestamp timestamp) {
+        assert timestamp != null;
+
         long headLink = versionChain.headLink();
 
         if (versionChain.isUncommitted()) {
@@ -660,16 +633,26 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
     @Override
     public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException {
-        return internalScan(keyFilter, txId, null);
+        return internalScan(keyFilter, txId);
     }
 
     @Override
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException {
-        return internalScan(keyFilter, null, timestamp);
+    public PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException {
+        assert timestamp != null;
+
+        IgniteCursor<VersionChain> treeCursor;
+
+        try {
+            treeCursor = versionChainTree.find(null, null);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Find failed", e);
+        }
+
+        return new TimestampCursor(treeCursor, keyFilter, timestamp);
     }
 
-    private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, @Nullable UUID txId, @Nullable HybridTimestamp timestamp) {
-        assert txId != null ^ timestamp != null;
+    private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, UUID txId) {
+        assert txId != null;
 
         IgniteCursor<VersionChain> treeCursor;
 
@@ -679,7 +662,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             throw new StorageException("Find failed", e);
         }
 
-        return new ScanCursor(treeCursor, keyFilter, txId, timestamp);
+        return new ScanCursor(treeCursor, keyFilter, txId);
     }
 
     @Override
@@ -712,6 +695,110 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         // TODO: IGNITE-17132 Implement it
     }
 
+    private class TimestampCursor implements PartitionScanCursor {
+        private final IgniteCursor<VersionChain> treeCursor;
+
+        private final Predicate<BinaryRow> keyFilter;
+
+        private final HybridTimestamp timestamp;
+
+        private ReadResult nextRead = null;
+
+        private boolean iterationExhausted = false;
+
+        public TimestampCursor(
+                IgniteCursor<VersionChain> treeCursor,
+                Predicate<BinaryRow> keyFilter,
+                HybridTimestamp timestamp
+        ) {
+            this.treeCursor = treeCursor;
+            this.keyFilter = keyFilter;
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (nextRead != null) {
+                return true;
+            }
+
+            if (iterationExhausted) {
+                return false;
+            }
+
+            while (true) {
+                boolean positionedToNext = tryAdvanceTreeCursor();
+
+                if (!positionedToNext) {
+                    iterationExhausted = true;
+                    return false;
+                }
+
+                VersionChain chain = getCurrentChainFromTreeCursor();
+                ReadResult res = findRowVersionByTimestamp(chain, timestamp);
+
+                if (res.isEmpty()) {
+                    continue;
+                }
+
+                if (!keyFilter.test(res.binaryRow())) {
+                    continue;
+                }
+
+                nextRead = res;
+                return true;
+            }
+        }
+
+        private boolean tryAdvanceTreeCursor() {
+            try {
+                return treeCursor.next();
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Error when trying to advance tree cursor", e);
+            }
+        }
+
+        private VersionChain getCurrentChainFromTreeCursor() {
+            try {
+                return treeCursor.get();
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to get element from tree cursor", e);
+            }
+        }
+
+        @Override
+        public ReadResult next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException("The cursor is exhausted");
+            }
+
+            assert nextRead != null;
+
+            ReadResult res = nextRead;
+            nextRead = null;
+
+            return res;
+        }
+
+        @Override
+        public void close() {
+            // No-op.
+        }
+
+        @Override
+        public BinaryRow committed(HybridTimestamp timestamp) {
+            if (iterationExhausted) {
+                throw new NoSuchElementException();
+            }
+
+            VersionChain chain = getCurrentChainFromTreeCursor();
+
+            ReadResult res = findRowVersionByTimestamp(chain, timestamp);
+
+            return res.binaryRow();
+        }
+    }
+
     private class ScanCursor implements Cursor<BinaryRow> {
         private final IgniteCursor<VersionChain> treeCursor;
 
@@ -719,8 +806,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
         private final @Nullable UUID transactionId;
 
-        private final @Nullable HybridTimestamp timestamp;
-
         private BinaryRow nextRow = null;
 
         private boolean iterationExhausted = false;
@@ -728,13 +813,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         public ScanCursor(
                 IgniteCursor<VersionChain> treeCursor,
                 Predicate<BinaryRow> keyFilter,
-                @Nullable UUID transactionId,
-                @Nullable HybridTimestamp timestamp
+                @Nullable UUID transactionId
         ) {
             this.treeCursor = treeCursor;
             this.keyFilter = keyFilter;
             this.transactionId = transactionId;
-            this.timestamp = timestamp;
         }
 
         @Override
@@ -756,7 +839,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 }
 
                 VersionChain chain = getCurrentChainFromTreeCursor();
-                BinaryRow row = findRowVersionInChain(chain, transactionId, timestamp, keyFilter);
+                BinaryRow row = findLatestRowVersion(chain, transactionId, keyFilter);
 
                 if (row != null) {
                     nextRow = row;
@@ -797,7 +880,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
         @Override
         public void close() {
-            // no-op
+            // No-op.
         }
     }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 2d0d46e4be..3028d9afe2 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionScanCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
@@ -541,6 +542,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         // It is guaranteed by descending order of timestamps.
         seekIterator.seek(keyBuf.array());
 
+        return processIterator(seekIterator, rowId, timestamp, keyBuf);
+    }
+
+    private static ReadResult processIterator(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp, ByteBuffer keyBuf) {
         // There's no guarantee that required key even exists. If it doesn't, then "seek" will point to a different key.
         // To avoid returning its value, we have to check that actual key matches what we need.
         // Here we prepare direct buffer to read key without timestamp. Shared direct buffer is used to avoid extra memory allocations.
@@ -689,8 +694,150 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException {
-        return scan(keyFilter, timestamp, null);
+    public PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException {
+        assert timestamp != null;
+
+        // Set next partition as an upper bound.
+        ReadOptions options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+
+        RocksIterator it = db.newIterator(cf, options);
+
+        // Here's seek buffer itself. Originally it contains a valid partition id, row id payload that's filled with zeroes, and maybe
+        // a timestamp value. Zero row id guarantees that it's lexicographically less than or equal to any other row id stored in the
+        // partition.
+        // Byte buffer from a thread-local field can't be used here, because of two reasons:
+        //  - no one guarantees that there will only be a single cursor;
+        //  - no one guarantees that returned cursor will not be used by other threads.
+        // The thing is, we need this buffer to preserve its content between invocations of "hasNext" method.
+        ByteBuffer seekKeyBuf = ByteBuffer.allocate(MAX_KEY_SIZE).order(BIG_ENDIAN).putShort((short) partitionId);
+
+        return new PartitionScanCursor() {
+
+            private RowId currentRowId;
+
+            @Override
+            public BinaryRow committed(HybridTimestamp timestamp) {
+                if (currentRowId == null) {
+                    throw new IllegalStateException();
+                }
+
+                seekKeyBuf.putLong(ROW_ID_OFFSET, currentRowId.mostSignificantBits());
+                seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, currentRowId.leastSignificantBits());
+                putTimestamp(seekKeyBuf.position(ROW_PREFIX_SIZE), timestamp);
+
+                it.seek(seekKeyBuf.position(0));
+
+                ReadResult readResult = processIterator(it, currentRowId, timestamp, seekKeyBuf);
+
+                if (readResult.isEmpty()) {
+                    return null;
+                }
+
+                return readResult.binaryRow();
+            }
+
+            /** Cached value for {@link #next()} method. Also optimizes the code of {@link #hasNext()}. */
+            private ReadResult next;
+
+            /** {@inheritDoc} */
+            @Override
+            public boolean hasNext() {
+                // Fast-path for consecutive invocations.
+                if (next != null) {
+                    return true;
+                }
+
+                // Prepare direct buffer slice to read keys from the iterator.
+                ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
+
+                while (true) {
+                    it.seek(seekKeyBuf);
+
+                    // We should do after each seek. Here in particular it means one of two things:
+                    //  - partition is empty;
+                    //  - iterator exhausted all the data in partition.
+                    if (invalid(it)) {
+                        return false;
+                    }
+
+                    it.key(directBuffer.position(0));
+
+                    directBuffer.position(ROW_ID_OFFSET);
+                    long msb = directBuffer.getLong();
+                    long lsb = directBuffer.getLong();
+
+                    var rowId = new RowId(partitionId, msb, lsb);
+
+                    seekKeyBuf.putLong(ROW_ID_OFFSET, msb);
+                    seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, lsb);
+                    putTimestamp(seekKeyBuf.position(ROW_PREFIX_SIZE), timestamp);
+
+                    it.seek(seekKeyBuf.position(0));
+
+                    ReadResult readResult = processIterator(it, rowId, timestamp, seekKeyBuf);
+
+                    if (readResult.isEmpty()) {
+                        incrementRowId(seekKeyBuf);
+
+                        seekKeyBuf.position(0);
+                        continue;
+                    }
+
+                    next = readResult;
+                    currentRowId = rowId;
+
+                    return true;
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public ReadResult next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+
+                ReadResult res = next;
+
+                next = null;
+
+                incrementRowId(seekKeyBuf);
+
+                seekKeyBuf.position(0);
+
+                return res;
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public void close() throws Exception {
+                IgniteUtils.closeAll(it, options);
+            }
+
+            private void incrementRowId(ByteBuffer buf) {
+                long lsb = 1 + buf.getLong(ROW_ID_OFFSET + Long.BYTES);
+
+                buf.putLong(ROW_ID_OFFSET + Long.BYTES, lsb);
+
+                if (lsb != 0L) {
+                    return;
+                }
+
+                long msb = 1 + buf.getLong(ROW_ID_OFFSET);
+
+                buf.putLong(ROW_ID_OFFSET, msb);
+
+                if (msb != 0L) {
+                    return;
+                }
+
+                short partitionId = (short) (1 + buf.getShort(0));
+
+                assert partitionId != 0;
+
+                buf.putShort(0, partitionId);
+            }
+        };
     }
 
     private Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, @Nullable HybridTimestamp timestamp, @Nullable UUID txId)
@@ -698,7 +845,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         assert timestamp == null ^ txId == null;
 
         // Set next partition as an upper bound.
-        var options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+        ReadOptions options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
 
         RocksIterator it = db.newIterator(cf, options);
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
index 940e8047f9..aa09719b8c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.tx.TxManager;
@@ -542,7 +543,7 @@ public class VersionedRowStore {
      */
     public Cursor<BinaryRow> scan(Predicate<BinaryRow> pred) {
         // TODO <MUTED> https://issues.apache.org/jira/browse/IGNITE-17309 Transactional support for partition scans
-        Cursor<BinaryRow> delegate = storage.scan(pred, Timestamp.nextVersion());
+        Cursor<ReadResult> delegate = storage.scan(pred, Timestamp.nextVersion());
 
         // TODO asch add tx support IGNITE-15087.
         return new Cursor<BinaryRow>() {
@@ -560,7 +561,7 @@ public class VersionedRowStore {
                 }
 
                 if (delegate.hasNext()) {
-                    cur = delegate.next();
+                    cur = delegate.next().binaryRow();
 
                     return cur != null ? true : hasNext(); // Skip tombstones.
                 }


[ignite-3] 02/02: IGNITE-17720 Extend MvPartitionStorage scan API with write intent resolution capabilities

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-17720
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 36d43ed08741e427c0c8f7093d7c47e6d629d8af
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Tue Sep 27 17:30:03 2022 +0400

    IGNITE-17720 Extend MvPartitionStorage scan API with write intent resolution capabilities
---
 .../TestConcurrentHashMapMvPartitionStorage.java   |  4 ++++
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 11 +++++-----
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 24 ----------------------
 3 files changed, 10 insertions(+), 29 deletions(-)

diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
index be90f55363..f9da2dbab9 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
@@ -340,6 +340,10 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
 
             @Override
             public BinaryRow committed(HybridTimestamp timestamp) {
+                if (currentChain == null) {
+                    throw new IllegalStateException();
+                }
+
                 ReadResult read = read(currentChain, timestamp, null, filter);
 
                 if (read.transactionId() == null) {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 9cc60288cd..0b8eabdb49 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -662,7 +662,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             throw new StorageException("Find failed", e);
         }
 
-        return new ScanCursor(treeCursor, keyFilter, txId);
+        return new TransactionIdCursor(treeCursor, keyFilter, txId);
     }
 
     @Override
@@ -775,6 +775,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             assert nextRead != null;
 
             ReadResult res = nextRead;
+
             nextRead = null;
 
             return res;
@@ -787,8 +788,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
         @Override
         public BinaryRow committed(HybridTimestamp timestamp) {
-            if (iterationExhausted) {
-                throw new NoSuchElementException();
+            if (iterationExhausted || nextRead == null) {
+                throw new IllegalStateException();
             }
 
             VersionChain chain = getCurrentChainFromTreeCursor();
@@ -799,7 +800,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
     }
 
-    private class ScanCursor implements Cursor<BinaryRow> {
+    private class TransactionIdCursor implements Cursor<BinaryRow> {
         private final IgniteCursor<VersionChain> treeCursor;
 
         private final Predicate<BinaryRow> keyFilter;
@@ -810,7 +811,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
         private boolean iterationExhausted = false;
 
-        public ScanCursor(
+        public TransactionIdCursor(
                 IgniteCursor<VersionChain> treeCursor,
                 Predicate<BinaryRow> keyFilter,
                 @Nullable UUID transactionId
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 3028d9afe2..994d697561 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -813,30 +813,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
             public void close() throws Exception {
                 IgniteUtils.closeAll(it, options);
             }
-
-            private void incrementRowId(ByteBuffer buf) {
-                long lsb = 1 + buf.getLong(ROW_ID_OFFSET + Long.BYTES);
-
-                buf.putLong(ROW_ID_OFFSET + Long.BYTES, lsb);
-
-                if (lsb != 0L) {
-                    return;
-                }
-
-                long msb = 1 + buf.getLong(ROW_ID_OFFSET);
-
-                buf.putLong(ROW_ID_OFFSET, msb);
-
-                if (msb != 0L) {
-                    return;
-                }
-
-                short partitionId = (short) (1 + buf.getShort(0));
-
-                assert partitionId != 0;
-
-                buf.putShort(0, partitionId);
-            }
         };
     }