You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/08/25 13:17:49 UTC
[ignite-3] branch main updated: IGNITE-17575 Create a Hash Index Storage per partition (#1035)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6d3be6f6b8 IGNITE-17575 Create a Hash Index Storage per partition (#1035)
6d3be6f6b8 is described below
commit 6d3be6f6b8d2cf287613eaebe39a8df9c8e23fc7
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Thu Aug 25 16:17:43 2022 +0300
IGNITE-17575 Create a Hash Index Storage per partition (#1035)
---
.../internal/storage/index/IndexRowImpl.java | 45 ++++
.../storage/AbstractMvPartitionStorageTest.java | 10 +-
.../storage/AbstractMvTableStorageTest.java | 124 ++++++++--
.../internal/storage/BaseMvStoragesTest.java | 18 ++
.../internal/storage/rocksdb/HashIndices.java | 46 ++++
.../storage/rocksdb/RocksDbTableStorage.java | 24 +-
.../storage/rocksdb/RocksDbMvTableStorageTest.java | 174 ++++++++++++++
.../storage/rocksdb/RocksDbTableStorageTest.java | 258 ---------------------
8 files changed, 407 insertions(+), 292 deletions(-)
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowImpl.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowImpl.java
new file mode 100644
index 0000000000..eb8ca70c57
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.storage.RowId;
+
+/**
+ * {@link IndexRow} implementation that simply stores the provided parameters.
+ */
+public class IndexRowImpl implements IndexRow {
+ private final BinaryTuple indexColumns;
+
+ private final RowId rowId;
+
+ public IndexRowImpl(BinaryTuple indexColumns, RowId rowId) {
+ this.indexColumns = indexColumns;
+ this.rowId = rowId;
+ }
+
+ @Override
+ public BinaryTuple indexColumns() {
+ return indexColumns;
+ }
+
+ @Override
+ public RowId rowId() {
+ return rowId;
+ }
+}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 711cb77504..1929314423 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.storage;
-import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@@ -39,7 +38,7 @@ import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
-import java.util.stream.StreamSupport;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.util.Cursor;
@@ -322,7 +321,6 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
Cursor<BinaryRow> cursor = scan(row -> true, txId);
assertTrue(cursor.hasNext());
- //noinspection ConstantConditions
assertTrue(cursor.hasNext());
List<TestValue> res = new ArrayList<>();
@@ -330,13 +328,11 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
res.add(value(cursor.next()));
assertTrue(cursor.hasNext());
- //noinspection ConstantConditions
assertTrue(cursor.hasNext());
res.add(value(cursor.next()));
assertFalse(cursor.hasNext());
- //noinspection ConstantConditions
assertFalse(cursor.hasNext());
assertThrows(NoSuchElementException.class, () -> cursor.next());
@@ -346,10 +342,10 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
private List<TestValue> convert(Cursor<BinaryRow> cursor) throws Exception {
try (cursor) {
- return StreamSupport.stream(cursor.spliterator(), false)
+ return cursor.stream()
.map(BaseMvStoragesTest::value)
.sorted(Comparator.nullsFirst(Comparator.naturalOrder()))
- .collect(toList());
+ .collect(Collectors.toList());
}
}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index f9364f1051..a15664e83a 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -19,17 +19,31 @@ package org.apache.ignite.internal.storage;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.nio.ByteBuffer;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleBuilder;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.index.HashIndexStorage;
+import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.schema.SchemaBuilders;
import org.apache.ignite.schema.definition.ColumnType;
import org.apache.ignite.schema.definition.TableDefinition;
@@ -40,16 +54,15 @@ import org.junit.jupiter.api.Test;
/**
* Abstract class that contains tests for {@link MvTableStorage} implementations.
*/
-// TODO: Use this to test RocksDB-based storage, see https://issues.apache.org/jira/browse/IGNITE-17318
// TODO: Use this to test B+tree-based storage, see https://issues.apache.org/jira/browse/IGNITE-17320
-public abstract class AbstractMvTableStorageTest {
+public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
private static final String SORTED_INDEX_NAME = "SORTED_IDX";
private static final String HASH_INDEX_NAME = "HASH_IDX";
private static final int PARTITION_ID = 0;
- private MvTableStorage tableStorage;
+ protected MvTableStorage tableStorage;
private UUID sortedIndexId;
@@ -58,9 +71,11 @@ public abstract class AbstractMvTableStorageTest {
protected abstract MvTableStorage tableStorage();
@BeforeEach
- void setUp() {
+ void setUpBase() {
tableStorage = tableStorage();
+ tableStorage.start();
+
createTestTable();
NamedListView<TableIndexView> indexConfiguration = tableStorage.configuration().indices().value();
@@ -70,38 +85,53 @@ public abstract class AbstractMvTableStorageTest {
}
@AfterEach
- void tearDown() {
- tableStorage.destroy();
+ void tearDownBase() {
+ tableStorage.stop();
}
/**
- * Tests creating a partition.
+ * Tests that {@link MvTableStorage#getMvPartition(int)} correctly returns an existing partition.
*/
@Test
- public void testCreatePartition() {
- int partitionId = 0;
+ void testCreatePartition() {
+ MvPartitionStorage absentStorage = tableStorage.getMvPartition(0);
+
+ assertThat(absentStorage, is(nullValue()));
- assertThat(tableStorage.getMvPartition(partitionId), is(nullValue()));
+ MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(0);
- assertThat(tableStorage.getOrCreateMvPartition(partitionId), is(notNullValue()));
+ assertThat(partitionStorage, is(notNullValue()));
- assertThat(tableStorage.getMvPartition(partitionId), is(notNullValue()));
+ assertThat(partitionStorage, is(sameInstance(tableStorage.getMvPartition(0))));
}
/**
- * Tests destroying a partition.
+ * Tests that partition data does not overlap.
*/
@Test
- public void testDestroyPartition() {
- int partitionId = 0;
+ void testPartitionIndependence() throws Exception {
+ MvPartitionStorage partitionStorage0 = tableStorage.getOrCreateMvPartition(42);
+ // Using a shifted ID value to test a multibyte scenario.
+ MvPartitionStorage partitionStorage1 = tableStorage.getOrCreateMvPartition(1 << 8);
+
+ var testData0 = binaryRow(new TestKey(1, "1"), new TestValue(10, "10"));
+
+ UUID txId = UUID.randomUUID();
- assertThat(tableStorage.getOrCreateMvPartition(partitionId), is(notNullValue()));
+ RowId rowId0 = partitionStorage0.runConsistently(() -> partitionStorage0.insert(testData0, txId));
- assertThat(tableStorage.getMvPartition(partitionId), is(notNullValue()));
+ assertThat(unwrap(partitionStorage0.read(rowId0, txId)), is(equalTo(unwrap(testData0))));
+ assertThat(partitionStorage1.read(rowId0, txId), is(nullValue()));
- assertThat(tableStorage.destroyPartition(partitionId), willCompleteSuccessfully());
+ var testData1 = binaryRow(new TestKey(2, "2"), new TestValue(20, "20"));
- assertThat(tableStorage.getMvPartition(partitionId), is(nullValue()));
+ RowId rowId1 = partitionStorage1.runConsistently(() -> partitionStorage1.insert(testData1, txId));
+
+ assertThat(partitionStorage0.read(rowId1, txId), is(nullValue()));
+ assertThat(unwrap(partitionStorage1.read(rowId1, txId)), is(equalTo(unwrap(testData1))));
+
+ assertThat(toList(partitionStorage0.scan(row -> true, txId)), contains(unwrap(testData0)));
+ assertThat(toList(partitionStorage1.scan(row -> true, txId)), contains(unwrap(testData1)));
}
/**
@@ -144,6 +174,54 @@ public abstract class AbstractMvTableStorageTest {
assertThat(tableStorage.destroyIndex(hashIndexId), willCompleteSuccessfully());
}
+ @Test
+ public void testHashIndexIndependence() {
+ MvPartitionStorage partitionStorage1 = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+
+ assertThat(tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexId), is(notNullValue()));
+ assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID + 1, hashIndexId));
+
+ MvPartitionStorage partitionStorage2 = tableStorage.getOrCreateMvPartition(PARTITION_ID + 1);
+
+ HashIndexStorage storage1 = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexId);
+ HashIndexStorage storage2 = tableStorage.getOrCreateHashIndex(PARTITION_ID + 1, hashIndexId);
+
+ assertThat(storage1, is(notNullValue()));
+ assertThat(storage2, is(notNullValue()));
+
+ var rowId1 = new RowId(PARTITION_ID);
+ var rowId2 = new RowId(PARTITION_ID + 1);
+
+ BinaryTupleSchema schema = BinaryTupleSchema.create(new Element[] {
+ new Element(NativeTypes.INT32, false),
+ new Element(NativeTypes.INT32, false)
+ });
+
+ ByteBuffer buffer = BinaryTupleBuilder.create(schema)
+ .appendInt(1)
+ .appendInt(2)
+ .build();
+
+ BinaryTuple tuple = new BinaryTuple(schema, buffer);
+
+ partitionStorage1.runConsistently(() -> {
+ storage1.put(new IndexRowImpl(tuple, rowId1));
+
+ return null;
+ });
+
+ partitionStorage2.runConsistently(() -> {
+ storage2.put(new IndexRowImpl(tuple, rowId2));
+
+ return null;
+ });
+
+ assertThat(getAll(storage1.get(tuple)), contains(rowId1));
+ assertThat(getAll(storage2.get(tuple)), contains(rowId2));
+
+ assertThat(tableStorage.destroyIndex(sortedIndexId), willCompleteSuccessfully());
+ }
+
/**
* Tests that exceptions are thrown if indices are not configured correctly.
*/
@@ -219,4 +297,12 @@ public abstract class AbstractMvTableStorageTest {
assertThat(createTableFuture, willCompleteSuccessfully());
}
+
+ private static <T> List<T> getAll(Cursor<T> cursor) {
+ try (cursor) {
+ return cursor.stream().collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index 461c856874..93ebe0f830 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.storage;
+import java.util.List;
import java.util.Locale;
import java.util.Objects;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
@@ -30,6 +32,8 @@ import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshal
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
@@ -103,6 +107,20 @@ public abstract class BaseMvStoragesTest {
}
}
+ protected static @Nullable IgniteBiTuple<TestKey, TestValue> unwrap(@Nullable BinaryRow binaryRow) {
+ if (binaryRow == null) {
+ return null;
+ }
+
+ return new IgniteBiTuple<>(key(binaryRow), value(binaryRow));
+ }
+
+ protected static List<IgniteBiTuple<TestKey, TestValue>> toList(Cursor<BinaryRow> cursor) throws Exception {
+ try (cursor) {
+ return cursor.stream().map(BaseMvStoragesTest::unwrap).collect(Collectors.toList());
+ }
+ }
+
/**
* Test pojo key.
*/
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndices.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndices.java
new file mode 100644
index 0000000000..4b0df30e7e
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndices.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
+import org.apache.ignite.internal.storage.index.HashIndexStorage;
+import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
+
+class HashIndices {
+ private final HashIndexDescriptor descriptor;
+
+ private final ConcurrentMap<Integer, HashIndexStorage> storages = new ConcurrentHashMap<>();
+
+ HashIndices(HashIndexDescriptor descriptor) {
+ this.descriptor = descriptor;
+ }
+
+ HashIndexStorage getOrCreateStorage(ColumnFamily indexCf, RocksDbMvPartitionStorage partitionStorage) {
+ return storages.computeIfAbsent(
+ partitionStorage.partitionId(),
+ partId -> new RocksDbHashIndexStorage(descriptor, indexCf, partitionStorage)
+ );
+ }
+
+ void destroy() {
+ storages.forEach((partitionId, storage) -> storage.destroy());
+ }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 8af7fbb63f..2496f3c661 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -111,8 +111,8 @@ public class RocksDbTableStorage implements MvTableStorage {
/** Partition storages. */
private volatile AtomicReferenceArray<RocksDbMvPartitionStorage> partitions;
- /** Hash Index storages by their IDs. */
- private final ConcurrentMap<UUID, HashIndexStorage> hashIndices = new ConcurrentHashMap<>();
+ /** Hash Index storages by Index IDs. */
+ private final ConcurrentMap<UUID, HashIndices> hashIndices = new ConcurrentHashMap<>();
/** Map with flush futures by sequence number at the time of the {@link #awaitFlush(boolean)} call. */
private final ConcurrentMap<Long, CompletableFuture<Void>> flushFuturesBySequenceNumber = new ConcurrentHashMap<>();
@@ -447,23 +447,31 @@ public class RocksDbTableStorage implements MvTableStorage {
@Override
public HashIndexStorage getOrCreateHashIndex(int partitionId, UUID indexId) {
- return hashIndices.computeIfAbsent(indexId, id -> {
- var indexDescriptor = new HashIndexDescriptor(id, tableCfg.value());
+ HashIndices storages = hashIndices.computeIfAbsent(indexId, id -> {
+ var indexDescriptor = new HashIndexDescriptor(indexId, tableCfg.value());
- return new RocksDbHashIndexStorage(indexDescriptor, hashIndexCf, getOrCreateMvPartition(partitionId));
+ return new HashIndices(indexDescriptor);
});
+
+ RocksDbMvPartitionStorage partitionStorage = getMvPartition(partitionId);
+
+ if (partitionStorage == null) {
+ throw new StorageException(String.format("Partition %d has not been created yet", partitionId));
+ }
+
+ return storages.getOrCreateStorage(hashIndexCf, partitionStorage);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> destroyIndex(UUID indexId) {
- HashIndexStorage storage = hashIndices.remove(indexId);
+ HashIndices storages = hashIndices.remove(indexId);
- if (storage == null) {
+ if (storages == null) {
return CompletableFuture.completedFuture(null);
}
- storage.destroy();
+ storages.destroy();
return awaitFlush(false);
}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
new file mode 100644
index 0000000000..e51591470b
--- /dev/null
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.nio.file.Path;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.storage.AbstractMvTableStorageTest;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageConfigurationSchema;
+import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for the {@link RocksDbTableStorage}.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ConfigurationExtension.class)
+public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
+ private RocksDbStorageEngine engine;
+
+ @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 16536, writeBufferSize = 16536}}")
+ private RocksDbStorageEngineConfiguration rocksDbEngineConfig;
+
+ @InjectConfiguration(
+ name = "table",
+ value = "mock { partitions = 512, dataStorage.name = rocksdb }",
+ polymorphicExtensions = {
+ RocksDbDataStorageConfigurationSchema.class,
+ SortedIndexConfigurationSchema.class,
+ HashIndexConfigurationSchema.class,
+ NullValueDefaultConfigurationSchema.class,
+ UnlimitedBudgetConfigurationSchema.class
+ }
+ )
+ private TableConfiguration tableCfg;
+
+ @WorkDirectory
+ private Path workDir;
+
+ @Override
+ protected MvTableStorage tableStorage() {
+ engine = new RocksDbStorageEngine(rocksDbEngineConfig, workDir);
+
+ engine.start();
+
+ MvTableStorage storage = engine.createMvTable(tableCfg);
+
+ assertThat(storage, is(instanceOf(RocksDbTableStorage.class)));
+
+ return storage;
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(
+ engine == null ? null : engine::stop
+ );
+ }
+
+ /**
+ * Tests that dropping a partition does not remove extra data.
+ */
+ @Test
+ void testDropPartition() {
+ var testData = binaryRow(new TestKey(1, "1"), new TestValue(10, "10"));
+
+ UUID txId = UUID.randomUUID();
+
+ MvPartitionStorage partitionStorage0 = tableStorage.getOrCreateMvPartition(42);
+
+ RowId rowId0 = partitionStorage0.runConsistently(() -> partitionStorage0.insert(testData, txId));
+
+ MvPartitionStorage partitionStorage1 = tableStorage.getOrCreateMvPartition(1 << 8);
+
+ RowId rowId1 = partitionStorage1.runConsistently(() -> partitionStorage1.insert(testData, txId));
+
+ CompletableFuture<Void> destroyFuture = tableStorage.destroyPartition(42);
+
+ // Partition destruction doesn't enforce flush.
+ ((RocksDbTableStorage) tableStorage).scheduleFlush();
+
+ assertThat(destroyFuture, willCompleteSuccessfully());
+
+ assertThat(tableStorage.getMvPartition(42), is(nullValue()));
+ assertThat(tableStorage.getOrCreateMvPartition(42).read(rowId0, txId), is(nullValue()));
+ assertThat(unwrap(tableStorage.getMvPartition(1 << 8).read(rowId1, txId)), is(equalTo(unwrap(testData))));
+ }
+
+ /**
+ * Tests that restarting the storage does not result in data loss.
+ */
+ @Test
+ void testRestart() {
+ var testData = binaryRow(new TestKey(1, "1"), new TestValue(10, "10"));
+
+ UUID txId = UUID.randomUUID();
+
+ MvPartitionStorage partitionStorage0 = tableStorage.getOrCreateMvPartition(0);
+
+ RowId rowId0 = partitionStorage0.runConsistently(() -> partitionStorage0.insert(testData, txId));
+
+ tableStorage.stop();
+
+ tableStorage = engine.createMvTable(tableCfg);
+
+ tableStorage.start();
+
+ assertThat(tableStorage.getMvPartition(0), is(notNullValue()));
+ assertThat(tableStorage.getMvPartition(1), is(nullValue()));
+ assertThat(unwrap(tableStorage.getMvPartition(0).read(rowId0, txId)), is(equalTo(unwrap(testData))));
+ }
+
+ @Test
+ void storageAdvertisesItIsPersistent() {
+ assertThat(tableStorage.isVolatile(), is(false));
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17318")
+ @Override
+ public void testCreateSortedIndex() {
+ super.testCreateSortedIndex();
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17318")
+ @Override
+ public void testDestroyIndex() {
+ super.testDestroyIndex();
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17318")
+ @Override
+ public void testMisconfiguredIndices() {
+ super.testMisconfiguredIndices();
+ }
+}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
deleted file mode 100644
index 5dc8bd36da..0000000000
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.rocksdb;
-
-import static org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.sameInstance;
-
-import java.nio.file.Path;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.EntryCountBudgetConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.TableConfiguration;
-import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
-import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.storage.BaseMvStoragesTest;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageChange;
-import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageConfigurationSchema;
-import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageView;
-import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-/**
- * Tests for the {@link RocksDbTableStorage}.
- */
-@ExtendWith(WorkDirectoryExtension.class)
-@ExtendWith(ConfigurationExtension.class)
-public class RocksDbTableStorageTest extends BaseMvStoragesTest {
- private RocksDbStorageEngine engine;
-
- private RocksDbTableStorage storage;
-
- @BeforeEach
- public void setUp(
- @WorkDirectory Path workDir,
- @InjectConfiguration(
- value = "mock {flushDelayMillis = 0, defaultRegion {size = 16536, writeBufferSize = 16536}}"
- ) RocksDbStorageEngineConfiguration rocksDbEngineConfig,
- @InjectConfiguration(
- name = "table",
- value = "mock.partitions = 512",
- polymorphicExtensions = {
- HashIndexConfigurationSchema.class,
- UnknownDataStorageConfigurationSchema.class,
- RocksDbDataStorageConfigurationSchema.class,
- ConstantValueDefaultConfigurationSchema.class,
- FunctionCallDefaultConfigurationSchema.class,
- NullValueDefaultConfigurationSchema.class,
- UnlimitedBudgetConfigurationSchema.class,
- EntryCountBudgetConfigurationSchema.class
- }
- ) TableConfiguration tableCfg
- ) throws Exception {
- CompletableFuture<Void> changeDataStorageFuture = tableCfg.dataStorage().change(c -> c.convert(RocksDbDataStorageChange.class));
-
- assertThat(changeDataStorageFuture, willBe(nullValue(Void.class)));
-
- assertThat(((RocksDbDataStorageView) tableCfg.dataStorage().value()).dataRegion(), equalTo(DEFAULT_DATA_REGION_NAME));
-
- engine = new RocksDbStorageEngine(rocksDbEngineConfig, workDir);
-
- engine.start();
-
- storage = engine.createMvTable(tableCfg);
-
- assertThat(storage, is(instanceOf(RocksDbTableStorage.class)));
-
- storage.start();
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- IgniteUtils.closeAll(
- storage == null ? null : storage::stop,
- engine == null ? null : engine::stop
- );
- }
-
- /**
- * Tests that {@link RocksDbTableStorage#getMvPartition(int)} correctly returns an existing partition.
- */
- @Test
- void testCreatePartition() {
- MvPartitionStorage absentStorage = storage.getMvPartition(0);
-
- assertThat(absentStorage, is(nullValue()));
-
- MvPartitionStorage partitionStorage = storage.getOrCreateMvPartition(0);
-
- assertThat(partitionStorage, is(notNullValue()));
-
- var testData = binaryRow(new TestKey(1, "1"), new TestValue(10, "10"));
-
- partitionStorage.runConsistently(() -> {
- UUID txId = UUID.randomUUID();
-
- RowId rowId = partitionStorage.insert(testData, txId);
-
- assertThat(partitionStorage, is(sameInstance(storage.getMvPartition(0))));
-
- assertThat(unwrap(partitionStorage.read(rowId, txId)), is(equalTo(unwrap(testData))));
-
- return null;
- });
- }
-
- /**
- * Tests that partition data does not overlap.
- */
- @Test
- void testPartitionIndependence() throws Exception {
- MvPartitionStorage partitionStorage0 = storage.getOrCreateMvPartition(42);
- // Using a shifted ID value to test a multibyte scenario.
- MvPartitionStorage partitionStorage1 = storage.getOrCreateMvPartition(1 << 8);
-
- var testData0 = binaryRow(new TestKey(1, "1"), new TestValue(10, "10"));
-
- UUID txId = UUID.randomUUID();
-
- RowId rowId0 = partitionStorage0.runConsistently(() -> partitionStorage0.insert(testData0, txId));
-
- assertThat(unwrap(partitionStorage0.read(rowId0, txId)), is(equalTo(unwrap(testData0))));
- assertThat(partitionStorage1.read(rowId0, txId), is(nullValue()));
-
- var testData1 = binaryRow(new TestKey(2, "2"), new TestValue(20, "20"));
-
- RowId rowId1 = partitionStorage1.runConsistently(() -> partitionStorage1.insert(testData1, txId));
-
- assertThat(partitionStorage0.read(rowId1, txId), is(nullValue()));
- assertThat(unwrap(partitionStorage1.read(rowId1, txId)), is(equalTo(unwrap(testData1))));
-
- assertThat(toList(partitionStorage0.scan(row -> true, txId)), contains(unwrap(testData0)));
- assertThat(toList(partitionStorage1.scan(row -> true, txId)), contains(unwrap(testData1)));
- }
-
- private List<IgniteBiTuple<TestKey, TestValue>> toList(Cursor<BinaryRow> cursor) throws Exception {
- try (cursor) {
- return cursor.stream().map(RocksDbTableStorageTest::unwrap).collect(Collectors.toList());
- }
- }
-
- /**
- * Tests that dropping a partition does not remove extra data.
- */
- @Test
- void testDropPartition() {
- var testData = binaryRow(new TestKey(1, "1"), new TestValue(10, "10"));
-
- UUID txId = UUID.randomUUID();
-
- MvPartitionStorage partitionStorage0 = storage.getOrCreateMvPartition(42);
-
- RowId rowId0 = partitionStorage0.runConsistently(() -> partitionStorage0.insert(testData, txId));
-
- MvPartitionStorage partitionStorage1 = storage.getOrCreateMvPartition(1 << 8);
-
- RowId rowId1 = partitionStorage1.runConsistently(() -> partitionStorage1.insert(testData, txId));
-
- CompletableFuture<Void> destroyFuture = storage.destroyPartition(42);
-
- // Partition desctuction doesn't enforce flush.
- storage.scheduleFlush();
-
- assertThat(destroyFuture, willCompleteSuccessfully());
-
- assertThat(storage.getMvPartition(42), is(nullValue()));
- assertThat(storage.getOrCreateMvPartition(42).read(rowId0, txId), is(nullValue()));
- assertThat(unwrap(storage.getMvPartition(1 << 8).read(rowId1, txId)), is(equalTo(unwrap(testData))));
- }
-
- /**
- * Tests that restarting the storage does not result in data loss.
- */
- @Test
- void testRestart(
- @InjectConfiguration(
- name = "table",
- polymorphicExtensions = {HashIndexConfigurationSchema.class, RocksDbDataStorageConfigurationSchema.class}
- ) TableConfiguration tableCfg
- ) throws Exception {
- var testData = binaryRow(new TestKey(1, "1"), new TestValue(10, "10"));
-
- UUID txId = UUID.randomUUID();
-
- MvPartitionStorage partitionStorage0 = storage.getOrCreateMvPartition(0);
-
- RowId rowId0 = partitionStorage0.runConsistently(() -> partitionStorage0.insert(testData, txId));
-
- storage.stop();
-
- tableCfg.dataStorage().change(c -> c.convert(RocksDbDataStorageChange.class)).get(1, TimeUnit.SECONDS);
-
- storage = engine.createMvTable(tableCfg);
-
- storage.start();
-
- assertThat(storage.getMvPartition(0), is(notNullValue()));
- assertThat(storage.getMvPartition(1), is(nullValue()));
- assertThat(unwrap(storage.getMvPartition(0).read(rowId0, txId)), is(equalTo(unwrap(testData))));
- }
-
- @Test
- void storageAdvertisesItIsPersistent() {
- assertThat(storage.isVolatile(), is(false));
- }
-
- private static @Nullable IgniteBiTuple<TestKey, TestValue> unwrap(@Nullable BinaryRow binaryRow) {
- if (binaryRow == null) {
- return null;
- }
-
- return new IgniteBiTuple<>(key(binaryRow), value(binaryRow));
- }
-}