You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/01/31 12:42:13 UTC

[ignite-3] branch main updated: IGNITE-18523 Destroy RocksDbTableStorage on destroy() (#1605)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d73d91684a IGNITE-18523 Destroy RocksDbTableStorage on destroy() (#1605)
d73d91684a is described below

commit d73d91684adf8b0fed828a18e937455bb9138af6
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Tue Jan 31 15:42:07 2023 +0300

    IGNITE-18523 Destroy RocksDbTableStorage on destroy() (#1605)
---
 .../storage/AbstractMvTableStorageTest.java        | 42 ++++++++++++----------
 .../ignite/internal/storage/rocksdb/HashIndex.java | 15 +++++++-
 .../storage/rocksdb/RocksDbTableStorage.java       | 17 ++++++---
 .../internal/storage/rocksdb/SortedIndex.java      | 11 +++++-
 .../storage/rocksdb/RocksDbMvTableStorageTest.java |  8 -----
 5 files changed, 59 insertions(+), 34 deletions(-)

diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index d88d271621..a99db53a09 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -544,37 +544,30 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
     }
 
     @Test
-    public void testDestroyTableStorage() throws Exception {
+    public void testDestroyTableStorage() {
         MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
         HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
         SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
 
-        RowId rowId = new RowId(PARTITION_ID);
-
-        TableRow tableRow = tableRow(new TestKey(0, "0"), new TestValue(1, "1"));
-
-        IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), tableRow, rowId);
-        IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), tableRow, rowId);
-
-        mvPartitionStorage.runConsistently(() -> {
-            mvPartitionStorage.addWriteCommitted(rowId, tableRow, clock.now());
-
-            hashIndexStorage.put(hashIndexRow);
-
-            sortedIndexStorage.put(sortedIndexRow);
+        List<IgniteTuple3<RowId, TableRow, HybridTimestamp>> rows = List.of(
+                new IgniteTuple3<>(new RowId(PARTITION_ID), tableRow(new TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
+                new IgniteTuple3<>(new RowId(PARTITION_ID), tableRow(new TestKey(1, "1"), new TestValue(1, "1")), clock.now())
+        );
 
-            return null;
-        });
+        fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
 
-        Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rowId);
+        Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rows.get(0).get1());
         PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
 
+        IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), rows.get(0).get2(), rows.get(0).get1());
+        IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), rows.get(0).get2(), rows.get(0).get1());
+
         Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(hashIndexRow.indexColumns());
 
-        Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(hashIndexRow.indexColumns());
+        Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(sortedIndexRow.indexColumns());
         Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.destroy().get(1, SECONDS);
+        assertThat(tableStorage.destroy(), willCompleteSuccessfully());
 
         checkStorageDestroyed(mvPartitionStorage);
         checkStorageDestroyed(hashIndexStorage);
@@ -590,6 +583,17 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
 
         // Let's check that nothing will happen if we try to destroy it again.
         assertThat(tableStorage.destroy(), willCompleteSuccessfully());
+
+        // Let's check that after restarting the table we will have an empty partition.
+        tableStorage = createMvTableStorage(tableStorage.tablesConfiguration());
+
+        tableStorage.start();
+
+        mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
     }
 
     @Test
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
index 1d15af522f..9697178356 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
@@ -20,9 +20,11 @@ package org.apache.ignite.internal.storage.rocksdb;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.jetbrains.annotations.Nullable;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
@@ -56,7 +58,7 @@ class HashIndex {
      * Removes all data associated with the index.
      */
     void destroy() {
-        storages.forEach((partitionId, storage) -> storage.destroy());
+        storages.values().forEach(RocksDbHashIndexStorage::destroy);
     }
 
     /**
@@ -83,4 +85,15 @@ class HashIndex {
     @Nullable RocksDbHashIndexStorage get(int partitionId) {
         return storages.get(partitionId);
     }
+
+    /**
+     * Closes all index storages.
+     */
+    void close() {
+        try {
+            IgniteUtils.closeAll(storages.values().stream().map(index -> index::close));
+        } catch (Exception e) {
+            throw new StorageException("Failed to close index storages: " + descriptor.id(), e);
+        }
+    }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index f6ebf8c7e5..39987f5153 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -50,7 +50,6 @@ import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
@@ -346,19 +345,27 @@ public class RocksDbTableStorage implements MvTableStorage {
         resources.add(writeOptions);
 
         for (int i = 0; i < partitions.length(); i++) {
-            MvPartitionStorage partition = partitions.get(i);
+            RocksDbMvPartitionStorage partition = partitions.get(i);
 
             if (partition != null) {
                 resources.add(partition::close);
             }
         }
 
+        for (HashIndex index : hashIndices.values()) {
+            resources.add(index::close);
+        }
+
+        for (SortedIndex index : sortedIndices.values()) {
+            resources.add(index::close);
+        }
+
         Collections.reverse(resources);
 
         try {
             IgniteUtils.closeAll(resources);
         } catch (Exception e) {
-            throw new StorageException("Failed to stop RocksDB table storage.", e);
+            throw new StorageException("Failed to stop RocksDB table storage: " + getTableName(), e);
         }
     }
 
@@ -375,8 +382,8 @@ public class RocksDbTableStorage implements MvTableStorage {
             IgniteUtils.deleteIfExists(tablePath);
 
             return completedFuture(null);
-        } catch (Throwable throwable) {
-            return failedFuture(throwable);
+        } catch (Throwable t) {
+            return failedFuture(new StorageException("Failed to destroy RocksDB table storage: " + getTableName(), t));
         }
     }
 
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
index 9c7f425119..df7d14fc9e 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
@@ -19,12 +19,14 @@ package org.apache.ignite.internal.storage.rocksdb;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.jetbrains.annotations.Nullable;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
@@ -90,7 +92,14 @@ class SortedIndex implements ManuallyCloseable {
 
     @Override
     public void close() {
-        indexCf.handle().close();
+        try {
+            IgniteUtils.closeAll(Stream.concat(
+                    storages.values().stream().map(index -> index::close),
+                    Stream.of(() -> indexCf.handle().close())
+            ));
+        } catch (Exception e) {
+            throw new StorageException("Failed to close index storages: " + descriptor.id(), e);
+        }
     }
 
     /**
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 7bb22da652..4176340dbb 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbSt
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -125,11 +124,4 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
     void storageAdvertisesItIsPersistent() {
         assertThat(tableStorage.isVolatile(), is(false));
     }
-
-    @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18523")
-    @Override
-    public void testDestroyTableStorage() throws Exception {
-        super.testDestroyTableStorage();
-    }
 }