You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/01/31 12:42:13 UTC
[ignite-3] branch main updated: IGNITE-18523 Destroy RocksDbTableStorage on destroy() (#1605)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d73d91684a IGNITE-18523 Destroy RocksDbTableStorage on destroy() (#1605)
d73d91684a is described below
commit d73d91684adf8b0fed828a18e937455bb9138af6
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Tue Jan 31 15:42:07 2023 +0300
IGNITE-18523 Destroy RocksDbTableStorage on destroy() (#1605)
---
.../storage/AbstractMvTableStorageTest.java | 42 ++++++++++++----------
.../ignite/internal/storage/rocksdb/HashIndex.java | 15 +++++++-
.../storage/rocksdb/RocksDbTableStorage.java | 17 ++++++---
.../internal/storage/rocksdb/SortedIndex.java | 11 +++++-
.../storage/rocksdb/RocksDbMvTableStorageTest.java | 8 -----
5 files changed, 59 insertions(+), 34 deletions(-)
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index d88d271621..a99db53a09 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -544,37 +544,30 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
}
@Test
- public void testDestroyTableStorage() throws Exception {
+ public void testDestroyTableStorage() {
MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
- RowId rowId = new RowId(PARTITION_ID);
-
- TableRow tableRow = tableRow(new TestKey(0, "0"), new TestValue(1, "1"));
-
- IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), tableRow, rowId);
- IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), tableRow, rowId);
-
- mvPartitionStorage.runConsistently(() -> {
- mvPartitionStorage.addWriteCommitted(rowId, tableRow, clock.now());
-
- hashIndexStorage.put(hashIndexRow);
-
- sortedIndexStorage.put(sortedIndexRow);
+ List<IgniteTuple3<RowId, TableRow, HybridTimestamp>> rows = List.of(
+ new IgniteTuple3<>(new RowId(PARTITION_ID), tableRow(new TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
+ new IgniteTuple3<>(new RowId(PARTITION_ID), tableRow(new TestKey(1, "1"), new TestValue(1, "1")), clock.now())
+ );
- return null;
- });
+ fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
- Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rowId);
+ Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rows.get(0).get1());
PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
+ IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), rows.get(0).get2(), rows.get(0).get1());
+ IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), rows.get(0).get2(), rows.get(0).get1());
+
Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(hashIndexRow.indexColumns());
- Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(hashIndexRow.indexColumns());
+ Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(sortedIndexRow.indexColumns());
Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, 0);
- tableStorage.destroy().get(1, SECONDS);
+ assertThat(tableStorage.destroy(), willCompleteSuccessfully());
checkStorageDestroyed(mvPartitionStorage);
checkStorageDestroyed(hashIndexStorage);
@@ -590,6 +583,17 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
// Let's check that nothing will happen if we try to destroy it again.
assertThat(tableStorage.destroy(), willCompleteSuccessfully());
+
+ // Let's check that after restarting the table we will have an empty partition.
+ tableStorage = createMvTableStorage(tableStorage.tablesConfiguration());
+
+ tableStorage.start();
+
+ mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+ hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+ sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+ checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
}
@Test
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
index 1d15af522f..9697178356 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
@@ -20,9 +20,11 @@ package org.apache.ignite.internal.storage.rocksdb;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
@@ -56,7 +58,7 @@ class HashIndex {
* Removes all data associated with the index.
*/
void destroy() {
- storages.forEach((partitionId, storage) -> storage.destroy());
+ storages.values().forEach(RocksDbHashIndexStorage::destroy);
}
/**
@@ -83,4 +85,15 @@ class HashIndex {
@Nullable RocksDbHashIndexStorage get(int partitionId) {
return storages.get(partitionId);
}
+
+ /**
+ * Closes all index storages.
+ */
+ void close() {
+ try {
+ IgniteUtils.closeAll(storages.values().stream().map(index -> index::close));
+ } catch (Exception e) {
+ throw new StorageException("Failed to close index storages: " + descriptor.id(), e);
+ }
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index f6ebf8c7e5..39987f5153 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -50,7 +50,6 @@ import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
@@ -346,19 +345,27 @@ public class RocksDbTableStorage implements MvTableStorage {
resources.add(writeOptions);
for (int i = 0; i < partitions.length(); i++) {
- MvPartitionStorage partition = partitions.get(i);
+ RocksDbMvPartitionStorage partition = partitions.get(i);
if (partition != null) {
resources.add(partition::close);
}
}
+ for (HashIndex index : hashIndices.values()) {
+ resources.add(index::close);
+ }
+
+ for (SortedIndex index : sortedIndices.values()) {
+ resources.add(index::close);
+ }
+
Collections.reverse(resources);
try {
IgniteUtils.closeAll(resources);
} catch (Exception e) {
- throw new StorageException("Failed to stop RocksDB table storage.", e);
+ throw new StorageException("Failed to stop RocksDB table storage: " + getTableName(), e);
}
}
@@ -375,8 +382,8 @@ public class RocksDbTableStorage implements MvTableStorage {
IgniteUtils.deleteIfExists(tablePath);
return completedFuture(null);
- } catch (Throwable throwable) {
- return failedFuture(throwable);
+ } catch (Throwable t) {
+ return failedFuture(new StorageException("Failed to destroy RocksDB table storage: " + getTableName(), t));
}
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
index 9c7f425119..df7d14fc9e 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
@@ -19,12 +19,14 @@ package org.apache.ignite.internal.storage.rocksdb;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Stream;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
@@ -90,7 +92,14 @@ class SortedIndex implements ManuallyCloseable {
@Override
public void close() {
- indexCf.handle().close();
+ try {
+ IgniteUtils.closeAll(Stream.concat(
+ storages.values().stream().map(index -> index::close),
+ Stream.of(() -> indexCf.handle().close())
+ ));
+ } catch (Exception e) {
+ throw new StorageException("Failed to close index storages: " + descriptor.id(), e);
+ }
}
/**
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 7bb22da652..4176340dbb 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbSt
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -125,11 +124,4 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
void storageAdvertisesItIsPersistent() {
assertThat(tableStorage.isVolatile(), is(false));
}
-
- @Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18523")
- @Override
- public void testDestroyTableStorage() throws Exception {
- super.testDestroyTableStorage();
- }
}