You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/09/27 12:23:28 UTC
[ignite-3] branch main updated: IGNITE-17673 Extended MV partition storage API with methods to help cleaning SQL indices (#1121)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3d017c4b7f IGNITE-17673 Extended MV partition storage API with methods to help cleaning SQL indices (#1121)
3d017c4b7f is described below
commit 3d017c4b7f03a71d93e8e2911b7255b86864694b
Author: ibessonov <be...@gmail.com>
AuthorDate: Tue Sep 27 15:23:23 2022 +0300
IGNITE-17673 Extended MV partition storage API with methods to help cleaning SQL indices (#1121)
---
.../org/apache/ignite/internal/util/Cursor.java | 14 ++++
.../internal/storage/MvPartitionStorage.java | 7 ++
.../storage/AbstractMvPartitionStorageTest.java | 74 ++++++++++++++++++++++
.../internal/storage/BaseMvStoragesTest.java | 7 ++
.../TestConcurrentHashMapMvPartitionStorage.java | 6 ++
.../mv/AbstractPageMemoryMvPartitionStorage.java | 23 +++++++
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 64 ++++++++++++++-----
7 files changed, 179 insertions(+), 16 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
index e4a6f53a07..aa9f547684 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/Cursor.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.util;
+import java.util.Collections;
import java.util.Iterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@@ -27,12 +28,25 @@ import java.util.stream.StreamSupport;
* @param <T> Type of elements.
*/
public interface Cursor<T> extends Iterator<T>, Iterable<T>, AutoCloseable {
+ /** Empty cursor instance. */
+ Cursor<?> EMPTY = fromIterator(Collections.emptyIterator());
+
/** {@inheritDoc} */
@Override
default Iterator<T> iterator() {
return this;
}
+ /**
+ * Creates an empty cursor.
+ *
+ * @param <T> Type of elements in iterator.
+ * @return Cursor.
+ */
+ static <T> Cursor<T> empty() {
+ return (Cursor<T>) EMPTY;
+ }
+
/**
* Creates an iterator based cursor.
*
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index b3c2e44949..de9742bf86 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -204,6 +204,13 @@ public interface MvPartitionStorage extends AutoCloseable {
*/
void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException;
+ /**
+ * Scans all versions of a single row.
+ *
+ * @param rowId Row id.
+ */
+ Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException;
+
/**
* Scans the partition and returns a cursor of values. All filtered values must either be uncommitted in current transaction
* or already committed in different transaction.
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 7b9354e55f..21e01306c8 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -33,6 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
@@ -46,6 +47,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.Pair;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -984,6 +986,78 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
assertNull(res.newestCommitTimestamp());
}
+ @Test
+ void testScanVersions() throws Exception {
+ RowId rowId = new RowId(PARTITION_ID, 100, 0);
+
+ // Populate storage with several versions for the same row id.
+ List<TestValue> values = new ArrayList<>(List.of(value, value2));
+
+ for (TestValue value : values) {
+ addWrite(rowId, binaryRow(key, value), newTransactionId());
+
+ commitWrite(rowId, clock.now());
+ }
+
+ // Put rows before and after.
+ RowId lowRowId = new RowId(PARTITION_ID, 99, 0);
+ RowId highRowId = new RowId(PARTITION_ID, 101, 0);
+
+ List.of(lowRowId, highRowId).forEach(newRowId -> {
+ addWrite(newRowId, binaryRow(key, value), newTransactionId());
+
+ commitWrite(newRowId, clock.now());
+ });
+
+ // Reverse expected values to simplify comparison - they are returned in reversed order, newest to oldest.
+ Collections.reverse(values);
+
+ List<IgniteBiTuple<TestKey, TestValue>> list = toList(storage.scanVersions(rowId));
+
+ assertEquals(values.size(), list.size());
+
+ for (int i = 0; i < list.size(); i++) {
+ IgniteBiTuple<TestKey, TestValue> kv = list.get(i);
+
+ assertEquals(key, kv.getKey());
+
+ assertEquals(values.get(i), kv.getValue());
+ }
+ }
+
+ @Test
+ void testScanVersionsWithWriteIntent() throws Exception {
+ RowId rowId = new RowId(PARTITION_ID, 100, 0);
+
+ addWrite(rowId, binaryRow(key, value), newTransactionId());
+
+ commitWrite(rowId, clock.now());
+
+ addWrite(rowId, binaryRow(key, value2), newTransactionId());
+
+ // Put rows before and after.
+ RowId lowRowId = new RowId(PARTITION_ID, 99, 0);
+ RowId highRowId = new RowId(PARTITION_ID, 101, 0);
+
+ List.of(lowRowId, highRowId).forEach(newRowId -> {
+ addWrite(newRowId, binaryRow(key, value), newTransactionId());
+
+ commitWrite(newRowId, clock.now());
+ });
+
+ List<IgniteBiTuple<TestKey, TestValue>> list = toList(storage.scanVersions(rowId));
+
+ assertEquals(2, list.size());
+
+ for (int i = 0; i < list.size(); i++) {
+ assertEquals(key, list.get(i).getKey());
+ }
+
+ assertEquals(value2, list.get(0).getValue());
+
+ assertEquals(value, list.get(1).getValue());
+ }
+
/**
* Returns row id that is lexicographically smaller (by the value of one) than the argument.
*
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index a6a0964ba3..ad7938b8e7 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -125,8 +125,10 @@ public abstract class BaseMvStoragesTest {
* Test pojo key.
*/
protected static class TestKey {
+ @IgniteToStringInclude
public int intKey;
+ @IgniteToStringInclude
public String strKey;
public TestKey() {
@@ -153,6 +155,11 @@ public abstract class BaseMvStoragesTest {
public int hashCode() {
return Objects.hash(intKey, strKey);
}
+
+ @Override
+ public String toString() {
+ return S.toString(TestKey.class, this);
+ }
}
/**
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
index 8cade1da54..65103d0ae9 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
+import java.util.stream.Stream;
import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -307,6 +308,11 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
return ReadResult.EMPTY;
}
+ @Override
+ public Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException {
+ return Cursor.fromIterator(Stream.iterate(map.get(rowId), Objects::nonNull, vc -> vc.next).map(vc -> vc.row).iterator());
+ }
+
/** {@inheritDoc} */
@Override
public Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, UUID txId) {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 22940d9857..96d4843873 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -22,12 +22,14 @@ import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
import java.nio.ByteBuffer;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
+import java.util.stream.Stream;
import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.configuration.schemas.table.HashIndexView;
import org.apache.ignite.configuration.schemas.table.SortedIndexView;
@@ -635,6 +637,27 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
}
+ @Override
+ public Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException {
+ try {
+ VersionChain versionChain = versionChainTree.findOne(new VersionChainKey(rowId));
+
+ if (versionChain == null) {
+ return Cursor.empty();
+ }
+
+ RowVersion head = readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
+
+ Stream<RowVersion> stream = Stream.iterate(head, Objects::nonNull, rowVersion ->
+ rowVersion.nextLink() == 0 ? null : readRowVersion(rowVersion.nextLink(), ALWAYS_LOAD_VALUE)
+ );
+
+ return Cursor.fromIterator(stream.map(rowVersion -> new ByteBufferRow(rowVersion.value())).iterator());
+ } catch (IgniteInternalCheckedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException {
return internalScan(keyFilter, txId, null);
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 6cf51b2e30..2d0d46e4be 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -38,6 +38,7 @@ import java.util.function.BiConsumer;
import java.util.function.Predicate;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -648,6 +649,37 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
return rowId.mostSignificantBits() == keyByf.getLong() && rowId.leastSignificantBits() == keyByf.getLong();
}
+ @Override
+ public Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException {
+ ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+
+ byte[] lowerBound = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
+
+ incrementRowId(keyBuf);
+
+ Slice upperBound = new Slice(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+
+ var options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+
+ RocksIterator it = db.newIterator(cf, options);
+
+ it.seek(lowerBound);
+
+ return new RocksIteratorAdapter<>(it) {
+ @Override
+ protected BinaryRow decodeEntry(byte[] key, byte[] value) {
+ return wrapValueIntoBinaryRow(value, key.length == ROW_PREFIX_SIZE);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+
+ IgniteUtils.closeAll(options, upperBound);
+ }
+ };
+ }
+
//TODO IGNITE-16914 Play with prefix settings and benchmark results.
/** {@inheritDoc} */
@Override
@@ -836,31 +868,31 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
public void close() throws Exception {
IgniteUtils.closeAll(it, options);
}
+ };
+ }
- private void incrementRowId(ByteBuffer buf) {
- long lsb = 1 + buf.getLong(ROW_ID_OFFSET + Long.BYTES);
+ private void incrementRowId(ByteBuffer buf) {
+ long lsb = 1 + buf.getLong(ROW_ID_OFFSET + Long.BYTES);
- buf.putLong(ROW_ID_OFFSET + Long.BYTES, lsb);
+ buf.putLong(ROW_ID_OFFSET + Long.BYTES, lsb);
- if (lsb != 0L) {
- return;
- }
+ if (lsb != 0L) {
+ return;
+ }
- long msb = 1 + buf.getLong(ROW_ID_OFFSET);
+ long msb = 1 + buf.getLong(ROW_ID_OFFSET);
- buf.putLong(ROW_ID_OFFSET, msb);
+ buf.putLong(ROW_ID_OFFSET, msb);
- if (msb != 0L) {
- return;
- }
+ if (msb != 0L) {
+ return;
+ }
- short partitionId = (short) (1 + buf.getShort(0));
+ short partitionId = (short) (1 + buf.getShort(0));
- assert partitionId != 0;
+ assert partitionId != 0;
- buf.putShort(0, partitionId);
- }
- };
+ buf.putShort(0, partitionId);
}
@Override