You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/09/27 12:23:28 UTC

[ignite-3] branch main updated: IGNITE-17673 Extended MV partition storage API with methods to help cleaning SQL indices (#1121)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d017c4b7f IGNITE-17673 Extended MV partition storage API with methods to help cleaning SQL indices (#1121)
3d017c4b7f is described below

commit 3d017c4b7f03a71d93e8e2911b7255b86864694b
Author: ibessonov <be...@gmail.com>
AuthorDate: Tue Sep 27 15:23:23 2022 +0300

    IGNITE-17673 Extended MV partition storage API with methods to help cleaning SQL indices (#1121)
---
 .../org/apache/ignite/internal/util/Cursor.java    | 14 ++++
 .../internal/storage/MvPartitionStorage.java       |  7 ++
 .../storage/AbstractMvPartitionStorageTest.java    | 74 ++++++++++++++++++++++
 .../internal/storage/BaseMvStoragesTest.java       |  7 ++
 .../TestConcurrentHashMapMvPartitionStorage.java   |  6 ++
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 23 +++++++
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 64 ++++++++++++++-----
 7 files changed, 179 insertions(+), 16 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
index e4a6f53a07..aa9f547684 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.util;
 
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
@@ -27,12 +28,25 @@ import java.util.stream.StreamSupport;
  * @param <T> Type of elements.
  */
 public interface Cursor<T> extends Iterator<T>, Iterable<T>, AutoCloseable {
+    /** Empty cursor instance. */
+    Cursor<?> EMPTY = fromIterator(Collections.emptyIterator());
+
     /** {@inheritDoc} */
     @Override
     default Iterator<T> iterator() {
         return this;
     }
 
+    /**
+     * Creates an empty cursor.
+     *
+     * @param <T> Type of elements in iterator.
+     * @return Cursor.
+     */
+    static <T> Cursor<T> empty() {
+        return (Cursor<T>) EMPTY;
+    }
+
     /**
      * Creates an iterator based cursor.
      *
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index b3c2e44949..de9742bf86 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -204,6 +204,13 @@ public interface MvPartitionStorage extends AutoCloseable {
      */
     void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException;
 
+    /**
+     * Scans all versions of a single row.
+     *
+     * @param rowId Row id.
+     */
+    Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException;
+
     /**
      * Scans the partition and returns a cursor of values. All filtered values must either be uncommitted in current transaction
      * or already committed in different transaction.
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 7b9354e55f..21e01306c8 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -33,6 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -46,6 +47,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.Pair;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
@@ -984,6 +986,78 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         assertNull(res.newestCommitTimestamp());
     }
 
+    @Test
+    void testScanVersions() throws Exception {
+        RowId rowId = new RowId(PARTITION_ID, 100, 0);
+
+        // Populate storage with several versions for the same row id.
+        List<TestValue> values = new ArrayList<>(List.of(value, value2));
+
+        for (TestValue value : values) {
+            addWrite(rowId, binaryRow(key, value), newTransactionId());
+
+            commitWrite(rowId, clock.now());
+        }
+
+        // Put rows before and after.
+        RowId lowRowId = new RowId(PARTITION_ID, 99, 0);
+        RowId highRowId = new RowId(PARTITION_ID, 101, 0);
+
+        List.of(lowRowId, highRowId).forEach(newRowId ->  {
+            addWrite(newRowId, binaryRow(key, value), newTransactionId());
+
+            commitWrite(newRowId, clock.now());
+        });
+
+        // Reverse expected values to simplify comparison - they are returned in reversed order, newest to oldest.
+        Collections.reverse(values);
+
+        List<IgniteBiTuple<TestKey, TestValue>> list = toList(storage.scanVersions(rowId));
+
+        assertEquals(values.size(), list.size());
+
+        for (int i = 0; i < list.size(); i++) {
+            IgniteBiTuple<TestKey, TestValue> kv = list.get(i);
+
+            assertEquals(key, kv.getKey());
+
+            assertEquals(values.get(i), kv.getValue());
+        }
+    }
+
+    @Test
+    void testScanVersionsWithWriteIntent() throws Exception {
+        RowId rowId = new RowId(PARTITION_ID, 100, 0);
+
+        addWrite(rowId, binaryRow(key, value), newTransactionId());
+
+        commitWrite(rowId, clock.now());
+
+        addWrite(rowId, binaryRow(key, value2), newTransactionId());
+
+        // Put rows before and after.
+        RowId lowRowId = new RowId(PARTITION_ID, 99, 0);
+        RowId highRowId = new RowId(PARTITION_ID, 101, 0);
+
+        List.of(lowRowId, highRowId).forEach(newRowId ->  {
+            addWrite(newRowId, binaryRow(key, value), newTransactionId());
+
+            commitWrite(newRowId, clock.now());
+        });
+
+        List<IgniteBiTuple<TestKey, TestValue>> list = toList(storage.scanVersions(rowId));
+
+        assertEquals(2, list.size());
+
+        for (int i = 0; i < list.size(); i++) {
+            assertEquals(key, list.get(i).getKey());
+        }
+
+        assertEquals(value2, list.get(0).getValue());
+
+        assertEquals(value, list.get(1).getValue());
+    }
+
     /**
      * Returns row id that is lexicographically smaller (by the value of one) than the argument.
      *
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index a6a0964ba3..ad7938b8e7 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -125,8 +125,10 @@ public abstract class BaseMvStoragesTest {
      * Test pojo key.
      */
     protected static class TestKey {
+        @IgniteToStringInclude
         public int intKey;
 
+        @IgniteToStringInclude
         public String strKey;
 
         public TestKey() {
@@ -153,6 +155,11 @@ public abstract class BaseMvStoragesTest {
         public int hashCode() {
             return Objects.hash(intKey, strKey);
         }
+
+        @Override
+        public String toString() {
+            return S.toString(TestKey.class, this);
+        }
     }
 
     /**
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
index 8cade1da54..65103d0ae9 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.BiConsumer;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
 import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -307,6 +308,11 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
         return ReadResult.EMPTY;
     }
 
+    @Override
+    public Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException {
+        return Cursor.fromIterator(Stream.iterate(map.get(rowId), Objects::nonNull, vc -> vc.next).map(vc -> vc.row).iterator());
+    }
+
     /** {@inheritDoc} */
     @Override
     public Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, UUID txId) {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 22940d9857..96d4843873 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -22,12 +22,14 @@ import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
 
 import java.nio.ByteBuffer;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.NamedListView;
 import org.apache.ignite.configuration.schemas.table.HashIndexView;
 import org.apache.ignite.configuration.schemas.table.SortedIndexView;
@@ -635,6 +637,27 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
     }
 
+    @Override
+    public Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException {
+        try {
+            VersionChain versionChain = versionChainTree.findOne(new VersionChainKey(rowId));
+
+            if (versionChain == null) {
+                return Cursor.empty();
+            }
+
+            RowVersion head = readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
+
+            Stream<RowVersion> stream = Stream.iterate(head, Objects::nonNull, rowVersion ->
+                    rowVersion.nextLink() == 0 ? null : readRowVersion(rowVersion.nextLink(), ALWAYS_LOAD_VALUE)
+            );
+
+            return Cursor.fromIterator(stream.map(rowVersion -> new ByteBufferRow(rowVersion.value())).iterator());
+        } catch (IgniteInternalCheckedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException {
         return internalScan(keyFilter, txId, null);
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 6cf51b2e30..2d0d46e4be 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -38,6 +38,7 @@ import java.util.function.BiConsumer;
 import java.util.function.Predicate;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -648,6 +649,37 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         return rowId.mostSignificantBits() == keyByf.getLong() && rowId.leastSignificantBits() == keyByf.getLong();
     }
 
+    @Override
+    public Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException {
+        ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+
+        byte[] lowerBound = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
+
+        incrementRowId(keyBuf);
+
+        Slice upperBound = new Slice(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+
+        var options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+
+        RocksIterator it = db.newIterator(cf, options);
+
+        it.seek(lowerBound);
+
+        return new RocksIteratorAdapter<>(it) {
+            @Override
+            protected BinaryRow decodeEntry(byte[] key, byte[] value) {
+                return wrapValueIntoBinaryRow(value, key.length == ROW_PREFIX_SIZE);
+            }
+
+            @Override
+            public void close() throws Exception {
+                super.close();
+
+                IgniteUtils.closeAll(options, upperBound);
+            }
+        };
+    }
+
     //TODO IGNITE-16914 Play with prefix settings and benchmark results.
     /** {@inheritDoc} */
     @Override
@@ -836,31 +868,31 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
             public void close() throws Exception {
                 IgniteUtils.closeAll(it, options);
             }
+        };
+    }
 
-            private void incrementRowId(ByteBuffer buf) {
-                long lsb = 1 + buf.getLong(ROW_ID_OFFSET + Long.BYTES);
+    private void incrementRowId(ByteBuffer buf) {
+        long lsb = 1 + buf.getLong(ROW_ID_OFFSET + Long.BYTES);
 
-                buf.putLong(ROW_ID_OFFSET + Long.BYTES, lsb);
+        buf.putLong(ROW_ID_OFFSET + Long.BYTES, lsb);
 
-                if (lsb != 0L) {
-                    return;
-                }
+        if (lsb != 0L) {
+            return;
+        }
 
-                long msb = 1 + buf.getLong(ROW_ID_OFFSET);
+        long msb = 1 + buf.getLong(ROW_ID_OFFSET);
 
-                buf.putLong(ROW_ID_OFFSET, msb);
+        buf.putLong(ROW_ID_OFFSET, msb);
 
-                if (msb != 0L) {
-                    return;
-                }
+        if (msb != 0L) {
+            return;
+        }
 
-                short partitionId = (short) (1 + buf.getShort(0));
+        short partitionId = (short) (1 + buf.getShort(0));
 
-                assert partitionId != 0;
+        assert partitionId != 0;
 
-                buf.putShort(0, partitionId);
-            }
-        };
+        buf.putShort(0, partitionId);
     }
 
     @Override