You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/11/22 14:50:35 UTC

[GitHub] [ignite-3] rpuch commented on a diff in pull request #1325: IGNITE-17132 [Native Persistence 3.0] Implement partition destruction for persistent PageMemory

rpuch commented on code in PR #1325:
URL: https://github.com/apache/ignite-3/pull/1325#discussion_r1029107559


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -65,13 +66,17 @@ public class Compactor extends IgniteWorker {
 
     private final @Nullable ThreadPoolExecutor threadPoolExecutor;
 
-    private final AtomicInteger deltaFileCount = new AtomicInteger();
+    /** Guarded by {@link #mux}. */
+    private boolean addedDeltaFiles;
 
     private final FilePageStoreManager filePageStoreManager;
 
     /** Thread local with buffers for the compaction threads. */
     private final ThreadLocal<ByteBuffer> threadBuf;
 
+    /** Partitions currently being processed, for example, writes dirty pages to delta file. */

Review Comment:
   Do we still need 'for example' words in this case? Don't we know exactly what 'processing' means here?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounter.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Helper class for tracking the completion of partition processing.
+ *
+ * <p>At the start of partition processing, you need to call {@link #onStartPartitionProcessing()}, at the end
+ * {@link #onFinishPartitionProcessing()}. When all partition processing is completed, the {@link #future()} will be completed.
+ *
+ * <p>It is recommended to use external synchronization for the correct operation of the {@link #counter partition processing counter} and

Review Comment:
   'Recommended' implies that it is nice to have, but not actually mandatory. In this case, is it mandatory or not? Also, could you please elaborate in the javadoc on what kind of synchronization is needed?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMap.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Helper class for thread-safe work with {@link PartitionProcessingCounter} for any partition of any group.
+ */
+public class PartitionProcessingCounterMap {
+    private final ConcurrentMap<GroupPartitionId, PartitionProcessingCounter> processedPartitions = new ConcurrentHashMap<>();
+
+    /**
+     * Callback at the beginning of checkpoint processing of a partition, for example, when writing dirty pages or executing a fsync.
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     */
+    public void onStartPartitionProcessing(int groupId, int partitionId) {
+        GroupPartitionId groupPartitionId = new GroupPartitionId(groupId, partitionId);
+
+        processedPartitions.compute(groupPartitionId, (id, partitionProcessingCounter) -> {
+            if (partitionProcessingCounter == null) {
+                PartitionProcessingCounter counter = new PartitionProcessingCounter();
+
+                counter.onStartPartitionProcessing();
+
+                return counter;
+            }
+
+            partitionProcessingCounter.onStartPartitionProcessing();
+
+            return partitionProcessingCounter;
+        });
+    }

Review Comment:
   ```suggestion
           processedPartitions.computeIfAbsent(groupPartitionId, key -> new PartitionProcessingCounter()).onStartPartitionProcessing();
       }
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -164,77 +170,90 @@ void waitDeltaFiles() {
     }
 
     /**
-     * Adds the number of delta files to compact.
-     *
-     * @param count Number of delta files.
+     * Callback on adding delta files so we can start compacting them.
      */
-    public void addDeltaFiles(int count) {
-        assert count >= 0;
-
-        if (count > 0) {
-            deltaFileCount.addAndGet(count);
+    public void onAddingDeltaFiles() {
+        synchronized (mux) {
+            addedDeltaFiles = true;
 
-            synchronized (mux) {
-                mux.notifyAll();
-            }
+            mux.notifyAll();
         }
     }
 
     /**
      * Merges delta files with partition files.
      */
     void doCompaction() {
-        // Let's collect one delta file for each partition.
-        Queue<IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo>> queue = filePageStoreManager.allPageStores().stream()
-                .flatMap(List::stream)
-                .map(filePageStore -> {
+        while (true) {
+            // Let's collect one delta file for each partition.
+            ConcurrentLinkedQueue<DeltaFileToCompaction> queue = new ConcurrentLinkedQueue<>();

Review Comment:
   Looks like `queue` is only used for enqueueing (via `add()`) and dequeueing (via `poll()`), so `Queue` is enough as the variable type. I suggest to change it accordingly.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMap.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Helper class for thread-safe work with {@link PartitionProcessingCounter} for any partition of any group.
+ */
+public class PartitionProcessingCounterMap {
+    private final ConcurrentMap<GroupPartitionId, PartitionProcessingCounter> processedPartitions = new ConcurrentHashMap<>();
+
+    /**
+     * Callback at the beginning of checkpoint processing of a partition, for example, when writing dirty pages or executing a fsync.
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     */
+    public void onStartPartitionProcessing(int groupId, int partitionId) {
+        GroupPartitionId groupPartitionId = new GroupPartitionId(groupId, partitionId);
+
+        processedPartitions.compute(groupPartitionId, (id, partitionProcessingCounter) -> {
+            if (partitionProcessingCounter == null) {
+                PartitionProcessingCounter counter = new PartitionProcessingCounter();
+
+                counter.onStartPartitionProcessing();
+
+                return counter;
+            }
+
+            partitionProcessingCounter.onStartPartitionProcessing();
+
+            return partitionProcessingCounter;
+        });
+    }
+
+    /**
+     * Callback on completion of partition processing, for example, when writing dirty pages or executing a fsync.
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     */
+    public void onFinishPartitionProcessing(int groupId, int partitionId) {
+        GroupPartitionId groupPartitionId = new GroupPartitionId(groupId, partitionId);
+
+        processedPartitions.compute(groupPartitionId, (id, partitionProcessingCounter) -> {
+            assert partitionProcessingCounter != null : id;
+            assert !partitionProcessingCounter.future().isDone() : id;
+
+            partitionProcessingCounter.onFinishPartitionProcessing();
+
+            return partitionProcessingCounter.future().isDone() ? null : partitionProcessingCounter;
+        });
+    }
+
+    /**
+     * Returns the future if the partition according to the given parameters is currently being processed, for example, dirty pages are
+     * being written or fsync is being done, {@code null} if the partition is not currently being processed.
+     *
+     * <p>Future will be added on {@link #onStartPartitionProcessing(int, int)} call and completed on
+     * {@link #onFinishPartitionProcessing(int, int)} call (equal to the number of {@link #onFinishPartitionProcessing(int, int)} calls).
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     */
+    @Nullable
+    public CompletableFuture<Void> getProcessedPartitionFuture(int groupId, int partitionId) {
+        PartitionProcessingCounter partitionProcessingCounter = processedPartitions.get(new GroupPartitionId(groupId, partitionId));
+
+        return partitionProcessingCounter == null ? null : partitionProcessingCounter.future();

Review Comment:
   It seems weird to return a null future. According to the usages, this future just gets waited on (if not null). How about creating a static field with a completed future and return it instead of null? Then the caller would start waiting on it and just immediately unblock itself and proceed, so the effect would be the same, but avoiding a null check (hence, simplifiying the caller code). Also, less possibilities to get an NPE.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java:
##########
@@ -230,78 +269,86 @@ public long allocatePage(int grpId, int partId, byte flags) throws IgniteInterna
     }
 
     /**
-     * Initializing the file page stores for a group.
+     * Initialization of the page storage for the group partition.
      *
      * @param tableName Table name.
      * @param tableId Integer table id.
-     * @param partitions Partition number, must be greater than {@code 0} and less {@link PageIdAllocator#MAX_PARTITION_ID}.
+     * @param partitionId Partition ID, must be between {@code 0} (inclusive) and {@link PageIdAllocator#MAX_PARTITION_ID} (inclusive).
      * @throws IgniteInternalCheckedException If failed.
      */
-    public void initialize(String tableName, int tableId, int partitions) throws IgniteInternalCheckedException {
-        assert partitions > 0 && partitions < MAX_PARTITION_ID : partitions;
+    public void initialize(String tableName, int tableId, int partitionId) throws IgniteInternalCheckedException {
+        assert partitionId >= 0 && partitionId <= MAX_PARTITION_ID : partitionId;
 
-        initGroupDirLock.lock(tableId);
+        stripedLock.lock(tableId + partitionId);
 
         try {
-            if (!groupPageStores.containsPageStores(tableId)) {
-                List<FilePageStore> partitionFilePageStores = createFilePageStores(tableId, partitions);
+            if (!groupPageStores.contains(tableId, partitionId)) {
+                Path tableWorkDir = ensureGroupWorkDir(tableId);

Review Comment:
   Looks like two terminologies are mixed: the old (Ingite 2?) terminology of 'group' with the new terminology where it is 'table' (because we seem to map tables to groups as 1-to-1). Should we stick to one terminology in `pagememory` everywhere?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java:
##########
@@ -75,52 +93,44 @@ public class FilePageStoreManager implements PageReadWriteManager {
     /** Group directory prefix. */
     public static final String GROUP_DIR_PREFIX = "table-";
 
-    /** Logger. */
-    private final IgniteLogger log;
-
     /** Starting directory for all file page stores, for example: 'db/group-123/index.bin'. */
     private final Path dbDir;
 
     /** Page size in bytes. */
     private final int pageSize;
 
-    /**
-     * Executor to disallow running code that modifies data in {@link #groupPageStores} concurrently with cleanup of file page store.
-     */
+    /** Executor to disallow running code that modifies data in {@link #groupPageStores} concurrently with cleanup of file page store. */
     private final LongOperationAsyncExecutor cleanupAsyncExecutor;
 
-    /** Mapping: group ID -> page store list. */
+    /** Mapping: group ID -> group page stores. */
     private final GroupPageStoresMap<FilePageStore> groupPageStores;
 
-    /** Group directory initialization lock. */
-    private final IgniteStripedLock initGroupDirLock = new IgniteStripedLock(Math.max(Runtime.getRuntime().availableProcessors(), 8));
+    /** Striped lock. */
+    private final IgniteStripedLock stripedLock = new IgniteStripedLock(Math.max(Runtime.getRuntime().availableProcessors(), 8));

Review Comment:
   This lock is only used for initialization. I think it would make the code clearer if we name it accordingly, like `initPartitionLock`



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java:
##########
@@ -188,6 +189,23 @@ void testReadWritePartitionMeta(@WorkDirectory Path workDir) throws Exception {
         }
     }
 
+    @Test
+    void testRemoveMeta() {
+        PartitionMetaManager manager = new PartitionMetaManager(ioRegistry, PAGE_SIZE);
+
+        GroupPartitionId id = new GroupPartitionId(0, 0);
+
+        assertDoesNotThrow(() -> manager.removeMeta(id));

Review Comment:
   It looks like we don't need to explicitly add that `assertDoesNotThrow()` here: if an exception is thrown, it fails the test anyway, and this is not the last operation of the test. It is just called for its side effects which are used later.



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java:
##########
@@ -358,19 +347,133 @@ void testAllPageStores() throws Exception {
 
         manager.start();
 
-        manager.initialize("test0", 1, 1);
-        manager.initialize("test1", 2, 1);
+        manager.initialize("test0", 1, 0);
+        manager.initialize("test1", 2, 0);
+
+        List<Path> allPageStoreFiles = manager.allPageStores().stream()
+                .map(GroupPageStores::getAll)
+                .flatMap(Collection::stream)
+                .map(PartitionPageStore::pageStore)
+                .map(FilePageStore::filePath)
+                .collect(toList());
 
         assertThat(
-                manager.allPageStores().stream().flatMap(List::stream).map(FilePageStore::filePath).collect(toList()),
+                allPageStoreFiles,
                 containsInAnyOrder(
                         workDir.resolve("db/table-1").resolve("part-0.bin"),
                         workDir.resolve("db/table-2").resolve("part-0.bin")
                 )
         );
     }
 
+    @Test
+    void testOnPartitionDestruction() throws Exception {
+        FilePageStoreManager manager = createManager();
+
+        manager.start();
+
+        manager.initialize("test0", 0, 0);
+        manager.initialize("test1", 1, 0);
+
+        FilePageStore filePageStore0 = manager.getStore(0, 0);
+        FilePageStore filePageStore1 = manager.getStore(1, 0);
+
+        filePageStore0.ensure();

Review Comment:
   This should not be addressed in this PR, but I'm curious: why is the method name 'ensure()'? It looks like something is missing, I would expect it to be 'ensureSomething()', like 'ensureInitialized()'.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java:
##########
@@ -443,4 +461,50 @@ public Path tmpDeltaFilePageStorePath(int groupId, int partitionId, int index) {
     public Path deltaFilePageStorePath(int groupId, int partitionId, int index) {
         return dbDir.resolve(GROUP_DIR_PREFIX + groupId).resolve(String.format(PART_DELTA_FILE_TEMPLATE, partitionId, index));
     }
+
+    /**
+     * Callback on destruction of the partition of the corresponding group.
+     *
+     * <p>Deletes the partition pages tore and all its delta files. Before that, it creates a marker file (for example,

Review Comment:
   ```suggestion
        * <p>Deletes the partition pages store and all its delta files. Before that, it creates a marker file (for example,
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -164,77 +170,90 @@ void waitDeltaFiles() {
     }
 
     /**
-     * Adds the number of delta files to compact.
-     *
-     * @param count Number of delta files.
+     * Callback on adding delta files so we can start compacting them.
      */
-    public void addDeltaFiles(int count) {
-        assert count >= 0;
-
-        if (count > 0) {
-            deltaFileCount.addAndGet(count);
+    public void onAddingDeltaFiles() {
+        synchronized (mux) {
+            addedDeltaFiles = true;
 
-            synchronized (mux) {
-                mux.notifyAll();
-            }
+            mux.notifyAll();
         }
     }
 
     /**
      * Merges delta files with partition files.
      */
     void doCompaction() {
-        // Let's collect one delta file for each partition.
-        Queue<IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo>> queue = filePageStoreManager.allPageStores().stream()
-                .flatMap(List::stream)
-                .map(filePageStore -> {
+        while (true) {
+            // Let's collect one delta file for each partition.
+            ConcurrentLinkedQueue<DeltaFileToCompaction> queue = new ConcurrentLinkedQueue<>();
+
+            for (GroupPageStores<FilePageStore> groupPageStores : filePageStoreManager.allPageStores()) {
+                for (PartitionPageStore<FilePageStore> partitionPageStore : groupPageStores.getAll()) {
+                    FilePageStore filePageStore = partitionPageStore.pageStore();
+
                     DeltaFilePageStoreIo deltaFileToCompaction = filePageStore.getDeltaFileToCompaction();
 
-                    return deltaFileToCompaction == null ? null : new IgniteBiTuple<>(filePageStore, deltaFileToCompaction);
-                })
-                .filter(Objects::nonNull)
-                .collect(toCollection(ConcurrentLinkedQueue::new));
+                    if (!filePageStore.isMarkedToDestroy() && deltaFileToCompaction != null) {
+                        queue.add(new DeltaFileToCompaction(
+                                groupPageStores.groupId(),
+                                partitionPageStore.partitionId(),
+                                filePageStore,
+                                deltaFileToCompaction
+                        ));
+                    }
+                }
+            }
+
+            if (queue.isEmpty()) {
+                break;
+            }
+
+            updateHeartbeat();
 
-        assert !queue.isEmpty();
+            int threads = threadPoolExecutor == null ? 1 : threadPoolExecutor.getMaximumPoolSize();
 
-        updateHeartbeat();
+            CompletableFuture<?>[] futures = new CompletableFuture[threads];
 
-        int threads = threadPoolExecutor == null ? 1 : threadPoolExecutor.getMaximumPoolSize();
+            for (int i = 0; i < threads; i++) {
+                CompletableFuture<?> future = futures[i] = new CompletableFuture<>();
 
-        CompletableFuture<?>[] futures = new CompletableFuture[threads];
+                Runnable merger = () -> {
+                    DeltaFileToCompaction toMerge;
 
-        for (int i = 0; i < threads; i++) {
-            CompletableFuture<?> future = futures[i] = new CompletableFuture<>();
+                    try {
+                        while ((toMerge = queue.poll()) != null) {
+                            GroupPartitionId partitionId = new GroupPartitionId(toMerge.groupId, toMerge.partitionId);
+
+                            processedPartitionMap.onStartPartitionProcessing(partitionId.getGroupId(), partitionId.getPartitionId());

Review Comment:
   Looks like `onStartPartitionProcessing()` and `onFinishPartitionProcessing()` are always called with group ID and partition ID taken from an instance of `GroupPartitionId`. How about just making these 2 methods (and the methods they call in `PartitionProcessingCounter`) accept `GroupPartitionId` and avoid this unpacking?



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java:
##########
@@ -59,6 +60,10 @@ public interface MvTableStorage {
     /**
      * Destroys a partition and all associated indices.
      *
+     * <p>This method will do nothing if there is no partition by ID, when trying to call methods to read or write (as well as all previous

Review Comment:
   I suggest to split this sentence into two sentences to make it more clear:
   
   1. First says that destruction is idempotent
   2. Second says that **after calling this method** any read/write/... will throw



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounter.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Helper class for tracking the completion of partition processing.
+ *
+ * <p>At the start of partition processing, you need to call {@link #onStartPartitionProcessing()}, at the end
+ * {@link #onFinishPartitionProcessing()}. When all partition processing is completed, the {@link #future()} will be completed.
+ *
+ * <p>It is recommended to use external synchronization for the correct operation of the {@link #counter partition processing counter} and
+ * the {@link #future future}.
+ */
+public class PartitionProcessingCounter {
+    private static final VarHandle COUNTER;
+
+    static {
+        try {
+            COUNTER = MethodHandles.lookup().findVarHandle(PartitionProcessingCounter.class, "counter", int.class);
+        } catch (ReflectiveOperationException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    /** Partition processing counter must be greater than or equal to zero. */
+    @SuppressWarnings("unused")
+    private volatile int counter;
+
+    /** Future that will be completed when the {@link #counter} is zero. */
+    private final CompletableFuture<Void> future = new CompletableFuture<>();
+
+    /**
+     * Callback at the start of partition processing.
+     */
+    public void onStartPartitionProcessing() {
+        assert !future.isDone();
+
+        int updatedValue = (int) COUNTER.getAndAdd(this, 1) + 1;

Review Comment:
   Looks analogous to `counter.incrementAndGet()` (when using `AtomicInteger`)



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -419,11 +505,69 @@ private static void createTestTable(TableConfiguration tableConfig) {
         assertThat(createTableFuture, willCompleteSuccessfully());
     }
 
-    private static <T> List<T> getAll(Cursor<T> cursor) {
+    private static <T> List<T> getAll(Cursor<T> cursor) throws Exception {
         try (cursor) {
             return cursor.stream().collect(Collectors.toList());
-        } catch (Exception e) {
-            throw new RuntimeException(e);
         }
     }
+
+    private void checkStorageDestroyed(MvPartitionStorage storage) {
+        int partId = PARTITION_ID;
+
+        assertThrows(StorageClosedException.class, () -> storage.runConsistently(() -> null));
+
+        assertThrows(StorageClosedException.class, storage::flush);
+
+        assertThrows(StorageClosedException.class, storage::lastAppliedIndex);
+        assertThrows(StorageClosedException.class, storage::lastAppliedTerm);
+        assertThrows(StorageClosedException.class, storage::persistedIndex);
+
+        assertThrows(StorageClosedException.class, () -> storage.runConsistently(() -> {

Review Comment:
   Do we need this check even if `runConsistently()` with empty action throws this same exception?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -419,11 +505,69 @@ private static void createTestTable(TableConfiguration tableConfig) {
         assertThat(createTableFuture, willCompleteSuccessfully());
     }
 
-    private static <T> List<T> getAll(Cursor<T> cursor) {
+    private static <T> List<T> getAll(Cursor<T> cursor) throws Exception {
         try (cursor) {
             return cursor.stream().collect(Collectors.toList());
-        } catch (Exception e) {
-            throw new RuntimeException(e);
         }
     }
+
+    private void checkStorageDestroyed(MvPartitionStorage storage) {
+        int partId = PARTITION_ID;
+
+        assertThrows(StorageClosedException.class, () -> storage.runConsistently(() -> null));
+
+        assertThrows(StorageClosedException.class, storage::flush);
+
+        assertThrows(StorageClosedException.class, storage::lastAppliedIndex);
+        assertThrows(StorageClosedException.class, storage::lastAppliedTerm);
+        assertThrows(StorageClosedException.class, storage::persistedIndex);

Review Comment:
   Let's add same check about `MvPartitionStorage#committedGroupConfiguration()`



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -398,23 +421,35 @@ public ReadResult next() {
 
     @Override
     public @Nullable RowId closestRowId(RowId lowerBound) throws StorageException {
+        checkClosed();
+
         return map.ceilingKey(lowerBound);
     }
 
-    /** {@inheritDoc} */
     @Override
     public long rowsCount() {
+        checkClosed();
+
         return map.size();
     }
 
-    /** {@inheritDoc} */
     @Override
     public void close() {
-        // No-op.
+        started = false;

Review Comment:
   This looks strange: how can one make something not started when it was already started? I suggest to rename the field to `closed` and flip its semantics: initially it's `false`, here we switch it to `true`, and we check whether is was closed in `checkClosed()`.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java:
##########
@@ -48,6 +49,8 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
 
     private final SortedIndexDescriptor descriptor;
 
+    private volatile boolean started = true;

Review Comment:
   Same thing about `started` and `closed`



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -51,6 +64,13 @@ public class PageMemoryHashIndexStorage implements HashIndexStorage {
     /** Highest possible RowId according to signed long ordering. */
     private final RowId highestRowId;
 
+    /** Busy lock for synchronous closing. */
+    private final IgniteSpinBusyLock closeBusyLock = new IgniteSpinBusyLock();
+
+    /** To avoid double closure. */
+    @SuppressWarnings("unused")
+    private volatile boolean started = true;

Review Comment:
   Same thing about `started` and `closed`



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -57,6 +69,13 @@ public class PageMemorySortedIndexStorage implements SortedIndexStorage {
     /** Highest possible RowId according to signed long ordering. */
     private final RowId highestRowId;
 
+    /** Busy lock for synchronous closing. */
+    private final IgniteSpinBusyLock closeBusyLock = new IgniteSpinBusyLock();
+
+    /** To avoid double closure. */
+    @SuppressWarnings("unused")
+    private volatile boolean started = true;

Review Comment:
   `started` and `closed`, as above



##########
modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java:
##########
@@ -31,13 +33,13 @@
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Tests for {@link PersistentPageMemoryTableStorage} class.
  */
-@ExtendWith(WorkDirectoryExtension.class)
-@ExtendWith(ConfigurationExtension.class)
+@ExtendWith({ConfigurationExtension.class, WorkDirectoryExtension.class})

Review Comment:
   Both styles seem ok, but why is this (one annotation with an array) seems better to you? It brings some syntactic noise (braces).



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMap.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Helper class for thread-safe work with {@link PartitionProcessingCounter} for any partition of any group.
+ */
+public class PartitionProcessingCounterMap {
+    private final ConcurrentMap<GroupPartitionId, PartitionProcessingCounter> processedPartitions = new ConcurrentHashMap<>();
+
+    /**
+     * Callback at the beginning of checkpoint processing of a partition, for example, when writing dirty pages or executing a fsync.
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     */
+    public void onStartPartitionProcessing(int groupId, int partitionId) {
+        GroupPartitionId groupPartitionId = new GroupPartitionId(groupId, partitionId);
+
+        processedPartitions.compute(groupPartitionId, (id, partitionProcessingCounter) -> {
+            if (partitionProcessingCounter == null) {
+                PartitionProcessingCounter counter = new PartitionProcessingCounter();
+
+                counter.onStartPartitionProcessing();
+
+                return counter;
+            }
+
+            partitionProcessingCounter.onStartPartitionProcessing();
+
+            return partitionProcessingCounter;
+        });
+    }

Review Comment:
   Just a suggestion. This way, the code is signiticantly shorter, but it replaces one atomic operation with 2 atomic operations (map update, then counter increment). Will this difference be critical in this case? Can processing start compete with processing finish?



##########
modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java:
##########
@@ -101,4 +96,16 @@ public void testAbortRebalanceMvPartition() throws Exception {
     public void testFinishRebalanceMvPartition() throws Exception {
         super.testFinishRebalanceMvPartition();
     }
+
+    @Test
+    @Override
+    public void testDestroyPartition() throws Exception {
+        super.testDestroyPartition();
+
+        // Let's make sure that the checkpoint doesn't failed.

Review Comment:
   Probably, should be one of "hasn't failed", "didn't fail", "doesn't fail"



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -164,77 +170,90 @@ void waitDeltaFiles() {
     }
 
     /**
-     * Adds the number of delta files to compact.
-     *
-     * @param count Number of delta files.
+     * Callback on adding delta files so we can start compacting them.
      */
-    public void addDeltaFiles(int count) {
-        assert count >= 0;
-
-        if (count > 0) {
-            deltaFileCount.addAndGet(count);
+    public void onAddingDeltaFiles() {
+        synchronized (mux) {

Review Comment:
   `mux` now acts more as a monitor or a conditional variable (waiting and notification) and not as a mutex (mutual exclusion). Should the variable be renamed?



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java:
##########
@@ -458,6 +448,47 @@ void testNextCheckpointInterval() throws Exception {
         );
     }
 
+    @Test
+    void testOnPartitionDestruction() throws Exception {
+        Checkpointer checkpointer = new Checkpointer(
+                "test",
+                null,
+                null,
+                mock(CheckpointWorkflow.class),
+                mock(CheckpointPagesWriterFactory.class),
+                mock(FilePageStoreManager.class),
+                mock(Compactor.class),
+                checkpointConfig
+        );
+
+        int groupId = 0;
+        int partitionId = 0;
+
+        // Everything should be fine as there is no current running checkpoint.
+        checkpointer.onPartitionDestruction(groupId, partitionId);
+
+        CheckpointProgressImpl checkpointProgress = (CheckpointProgressImpl) checkpointer.scheduledProgress();
+
+        checkpointer.startCheckpointProgress();
+
+        checkpointer.onPartitionDestruction(groupId, partitionId);
+
+        checkpointProgress.transitTo(LOCK_RELEASED);
+        assertTrue(checkpointProgress.inProgress());
+
+        // Everything should be fine so on a "working" checkpoint we don't process the partition anything.

Review Comment:
   ```suggestion
           // Everything should be fine so on a "working" checkpoint we don't process the partition anyhow.
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounter.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Helper class for tracking the completion of partition processing.
+ *
+ * <p>At the start of partition processing, you need to call {@link #onStartPartitionProcessing()}, at the end
+ * {@link #onFinishPartitionProcessing()}. When all partition processing is completed, the {@link #future()} will be completed.
+ *
+ * <p>It is recommended to use external synchronization for the correct operation of the {@link #counter partition processing counter} and
+ * the {@link #future future}.
+ */
+public class PartitionProcessingCounter {
+    private static final VarHandle COUNTER;
+
+    static {
+        try {
+            COUNTER = MethodHandles.lookup().findVarHandle(PartitionProcessingCounter.class, "counter", int.class);
+        } catch (ReflectiveOperationException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    /** Partition processing counter must be greater than or equal to zero. */
+    @SuppressWarnings("unused")
+    private volatile int counter;
+
+    /** Future that will be completed when the {@link #counter} is zero. */
+    private final CompletableFuture<Void> future = new CompletableFuture<>();
+
+    /**
+     * Callback at the start of partition processing.
+     */
+    public void onStartPartitionProcessing() {
+        assert !future.isDone();
+
+        int updatedValue = (int) COUNTER.getAndAdd(this, 1) + 1;
+
+        assert updatedValue > 0 : updatedValue;
+    }
+
+    /**
+     * Callback at the finish of partition processing.
+     */
+    public void onFinishPartitionProcessing() {
+        assert !future.isDone();
+
+        int updatedValue = (int) COUNTER.getAndAdd(this, -1) - 1;

Review Comment:
   Looks analogous to `counter.decrementAndGet()` (when using `AtomicInteger`)



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -164,77 +170,90 @@ void waitDeltaFiles() {
     }
 
     /**
-     * Adds the number of delta files to compact.
-     *
-     * @param count Number of delta files.
+     * Callback on adding delta files so we can start compacting them.
      */
-    public void addDeltaFiles(int count) {
-        assert count >= 0;
-
-        if (count > 0) {
-            deltaFileCount.addAndGet(count);
+    public void onAddingDeltaFiles() {
+        synchronized (mux) {
+            addedDeltaFiles = true;
 
-            synchronized (mux) {
-                mux.notifyAll();
-            }
+            mux.notifyAll();
         }
     }
 
     /**
      * Merges delta files with partition files.
      */
     void doCompaction() {
-        // Let's collect one delta file for each partition.
-        Queue<IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo>> queue = filePageStoreManager.allPageStores().stream()
-                .flatMap(List::stream)
-                .map(filePageStore -> {
+        while (true) {
+            // Let's collect one delta file for each partition.
+            ConcurrentLinkedQueue<DeltaFileToCompaction> queue = new ConcurrentLinkedQueue<>();
+
+            for (GroupPageStores<FilePageStore> groupPageStores : filePageStoreManager.allPageStores()) {
+                for (PartitionPageStore<FilePageStore> partitionPageStore : groupPageStores.getAll()) {
+                    FilePageStore filePageStore = partitionPageStore.pageStore();
+
                     DeltaFilePageStoreIo deltaFileToCompaction = filePageStore.getDeltaFileToCompaction();
 
-                    return deltaFileToCompaction == null ? null : new IgniteBiTuple<>(filePageStore, deltaFileToCompaction);
-                })
-                .filter(Objects::nonNull)
-                .collect(toCollection(ConcurrentLinkedQueue::new));
+                    if (!filePageStore.isMarkedToDestroy() && deltaFileToCompaction != null) {
+                        queue.add(new DeltaFileToCompaction(
+                                groupPageStores.groupId(),
+                                partitionPageStore.partitionId(),
+                                filePageStore,
+                                deltaFileToCompaction
+                        ));
+                    }
+                }
+            }
+
+            if (queue.isEmpty()) {
+                break;
+            }
+
+            updateHeartbeat();
 
-        assert !queue.isEmpty();
+            int threads = threadPoolExecutor == null ? 1 : threadPoolExecutor.getMaximumPoolSize();
 
-        updateHeartbeat();
+            CompletableFuture<?>[] futures = new CompletableFuture[threads];
 
-        int threads = threadPoolExecutor == null ? 1 : threadPoolExecutor.getMaximumPoolSize();
+            for (int i = 0; i < threads; i++) {
+                CompletableFuture<?> future = futures[i] = new CompletableFuture<>();
 
-        CompletableFuture<?>[] futures = new CompletableFuture[threads];
+                Runnable merger = () -> {
+                    DeltaFileToCompaction toMerge;
 
-        for (int i = 0; i < threads; i++) {
-            CompletableFuture<?> future = futures[i] = new CompletableFuture<>();
+                    try {
+                        while ((toMerge = queue.poll()) != null) {

Review Comment:
   This 'assignment inside a different expression' is sneaky. It makes the code more difficult to understand, but the gain is really small (just 1 line spared?). I think this idiom is more harmful than useful.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -382,6 +389,85 @@ public void testFinishRebalanceMvPartition() throws Exception {
         assertDoesNotThrow(() -> tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
     }
 
+    @Test
+    public void testDestroyPartition() throws Exception {
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> tableStorage.destroyPartition(tableStorage.configuration().partitions().value())
+        );
+
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+        IndexRow indexRow = indexRow(binaryRow, rowId);
+
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
+
+            hashIndexStorage.put(indexRow);
+
+            sortedIndexStorage.put(indexRow);
+
+            return null;
+        });
+
+        Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rowId);
+        PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
+
+        Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(indexRow.indexColumns());
+
+        Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, 0);
+
+        tableStorage.destroyPartition(PARTITION_ID);
+
+        // Let's check that we won't get destroyed storages.
+        assertNull(tableStorage.getMvPartition(PARTITION_ID));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
+
+        checkStorageDestroyed(mvPartitionStorage);
+        checkStorageDestroyed(hashIndexStorage);
+        checkStorageDestroyed(sortedIndexStorage);
+
+        assertThrows(StorageClosedException.class, () -> getAll(scanVersionsCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanTimestampCursor));
+
+        assertThrows(StorageClosedException.class, () -> getAll(getFromHashIndexCursor));
+
+        assertThrows(StorageClosedException.class, () -> getAll(getFromSortedIndexCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanFromSortedIndexCursor));
+
+        // Let's check that nothing will happen if we try to destroy a non-existing partition.
+        tableStorage.destroyPartition(PARTITION_ID);

Review Comment:
   Let's wrap this into `assertDoesNotThrow()` to give a hint to the reader that we want to make sure that nothing explodes



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounter.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Helper class for tracking the completion of partition processing.
+ *
+ * <p>At the start of partition processing, you need to call {@link #onStartPartitionProcessing()}, at the end
+ * {@link #onFinishPartitionProcessing()}. When all partition processing is completed, the {@link #future()} will be completed.
+ *
+ * <p>It is recommended to use external synchronization for the correct operation of the {@link #counter partition processing counter} and
+ * the {@link #future future}.
+ */
+public class PartitionProcessingCounter {
+    private static final VarHandle COUNTER;
+
+    static {
+        try {
+            COUNTER = MethodHandles.lookup().findVarHandle(PartitionProcessingCounter.class, "counter", int.class);

Review Comment:
   It looks like a replacement for an `AtomicInteger`. `AtomicInteger` would make the code more concise and easier to understand, you could use `incrementAndGet()` and `decrementAndGet()` below.
   
   You probably wanted to spare object instances using this approach, but it's just one instance per partition. It seems unlikely that someone will have even a million partitions in the same JVM, so the saving is not worth code complexity.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -198,6 +204,7 @@ public void testCreateHashIndex() {
     /**
      * Tests destroying an index.
      */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17626")

Review Comment:
   The mentioned Jira issue is about pagemem-based storages, but the test is disabled for all of them. Should other Jira issues be mentioned here as well?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java:
##########
@@ -139,31 +149,63 @@ public void start() throws IgniteInternalCheckedException {
             throw new IgniteInternalCheckedException("Could not create work directory for page stores: " + dbDir, e);
         }
 
-        if (log.isWarnEnabled()) {
+        if (LOG.isWarnEnabled()) {
             String tmpDir = System.getProperty("java.io.tmpdir");
 
             if (tmpDir != null && this.dbDir.startsWith(tmpDir)) {
-                log.warn("Persistence store directory is in the temp directory and may be cleaned. "
+                LOG.warn("Persistence store directory is in the temp directory and may be cleaned. "
                         + "To avoid this change location of persistence directories [currentDir={}]", this.dbDir);
             }
         }
 
+        List<Path> toDelete = new ArrayList<>();
+
         try (Stream<Path> tmpFileStream = Files.find(
                 dbDir,
                 Integer.MAX_VALUE,
-                (path, basicFileAttributes) -> path.getFileName().toString().endsWith(TMP_FILE_SUFFIX)
-        )) {
-            List<Path> tmpFiles = tmpFileStream.collect(toList());
+                (path, basicFileAttributes) -> path.getFileName().toString().endsWith(TMP_FILE_SUFFIX))
+        ) {
+            toDelete.addAll(tmpFileStream.collect(toList()));
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Error while searching temporary files:" + dbDir, e);
+        }
+
+        Pattern delPartitionFilePatter = Pattern.compile(DEL_PART_FILE_REGEXP);

Review Comment:
   ```suggestion
           Pattern delPartitionFilePattern = Pattern.compile(DEL_PART_FILE_REGEXP);
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -418,4 +430,43 @@ public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId) {
         // TODO: IGNITE-18029 Implement
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public void destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) throws StorageException {
+        int partitionId = mvPartitionStorage.partitionId();
+
+        CompletableFuture<Void> previousFuture = destroyFutureByPartitionId.put(partitionId, new CompletableFuture<>());
+
+        assert previousFuture == null : "Previous destruction of the partition has not completed: " + partitionId;
+
+        // It is enough for us to close the partition storage and its indexes (do not destroy), perform some actions, and then simply

Review Comment:
   What are 'some actions'? This is very mysterious :)



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java:
##########
@@ -40,6 +41,8 @@ public class TestHashIndexStorage implements HashIndexStorage {
 
     private final HashIndexDescriptor descriptor;
 
+    private volatile boolean started = true;

Review Comment:
   It seems that `closed` would look more logical as making something not started after it was started is a bit strange.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java:
##########
@@ -216,6 +217,7 @@ public void testRemoveIdempotence() {
         assertDoesNotThrow(() -> remove(row));
     }
 
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17626")

Review Comment:
   The mentioned Jira issue is about pagemem-based indices, but the test is disabled for all of them. Should other Jira issues be mentioned here as well?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -120,22 +188,63 @@ public void remove(IndexRow row) {
             remove.afterCompletion();
         } catch (IgniteInternalCheckedException e) {
             throw new StorageException("Failed to remove value from index", e);
+        } finally {
+            closeBusyLock.leaveBusy();
         }
     }
 
     @Override
     public Cursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
-        boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
-        boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
-
-        SortedIndexRowKey lower = createBound(lowerBound, !includeLower);
-
-        SortedIndexRowKey upper = createBound(upperBound, includeUpper);
+        if (!closeBusyLock.enterBusy()) {
+            throwStorageClosedException();
+        }
 
         try {
-            return map(sortedIndexTree.find(lower, upper), this::toIndexRowImpl);
+            boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
+            boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
+
+            SortedIndexRowKey lower = createBound(lowerBound, !includeLower);
+
+            SortedIndexRowKey upper = createBound(upperBound, includeUpper);
+
+            Cursor<SortedIndexRow> cursor = sortedIndexTree.find(lower, upper);
+
+            return new Cursor<>() {
+                @Override
+                public void close() {
+                    cursor.close();
+                }
+
+                @Override
+                public boolean hasNext() {

Review Comment:
   How about extracting a base class containing `hasNext()` and `close()` implementations like it's done in `BusyRocksIteratorAdapter`? This would allow to reduce duplication.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -17,22 +17,35 @@
 
 package org.apache.ignite.internal.storage.pagememory.index.hash;
 
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumns;
 import org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumnsFreeList;
 import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.internal.util.CursorUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 
 /**
- * Hash index storage implementation.
+ * Implementation of Hash index storage using Page Memory.
  */
 public class PageMemoryHashIndexStorage implements HashIndexStorage {
+    private static final VarHandle STARTED;
+
+    static {
+        try {
+            STARTED = MethodHandles.lookup().findVarHandle(PageMemoryHashIndexStorage.class, "started", boolean.class);

Review Comment:
   Let's use `AtomicBoolean` instead. It makes the code simpler and the price we pay (one object per index per partition) is tiny. The same relates to all pagemem-based index and `MvPartitionStorage` implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org