You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2023/01/09 17:10:17 UTC

[GitHub] [ignite-3] tkalkirill opened a new pull request, #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

tkalkirill opened a new pull request, #1506:
URL: https://github.com/apache/ignite-3/pull/1506

   https://issues.apache.org/jira/browse/IGNITE-18029


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072180599


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -88,25 +92,46 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
     /** Partition id for 1 storage. */
     protected static final int PARTITION_ID_1 = 1 << 8;
 
-    private MvTableStorage tableStorage;
+    protected MvTableStorage tableStorage;
 
     private TableIndexView sortedIdx;
 
     private TableIndexView hashIdx;
 
+    private StorageEngine storageEngine;
+
     /**
      * Initializes the internal structures needed for tests.
      *
      * <p>This method *MUST* always be called in either subclass' constructor or setUp method.
      */
-    protected final void initialize(MvTableStorage tableStorage, TablesConfiguration tablesCfg) {
-        createTestTable(tableStorage.configuration());
-        createTestIndexes(tablesCfg);
+    protected final void initialize(StorageEngine storageEngine, TablesConfiguration tablesConfig) {
+        createTestTable(getTableConfig(tablesConfig));
+        createTestIndexes(tablesConfig);
+
+        this.storageEngine = storageEngine;

Review Comment:
   Makes sense, fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072346248


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -96,49 +98,22 @@ public boolean isVolatile() {
 
     @Override
     protected void finishDestruction() {
-        dataRegion.pageMemory().onGroupDestroyed(tableCfg.tableId().value());
+        dataRegion.pageMemory().onGroupDestroyed(tableConfig.tableId().value());
     }
 
     @Override
     public PersistentPageMemoryMvPartitionStorage createMvPartitionStorage(int partitionId) {
-        CompletableFuture<Void> partitionDestroyFuture = partitionIdDestroyFutureMap.get(partitionId);
+        waitPartitionToBeDestroyed(partitionId);
 
-        if (partitionDestroyFuture != null) {
-            try {
-                // Time is chosen randomly (long enough) so as not to call #join().
-                partitionDestroyFuture.get(10, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                throw new StorageException("Error waiting for the destruction of the previous version of the partition: " + partitionId, e);
-            }
-        }
+        TableView tableView = tableConfig.value();
 
-        TableView tableView = tableCfg.value();
+        GroupPartitionId groupPartitionId = createGroupPartitionId(partitionId);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableView.tableId(), partitionId);
+        PartitionMeta meta = getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(groupPartitionId);

Review Comment:
   I didn't put it in a separate lock block, but moved it outside of it.
   This is not an error since we are reading the `PartitionMeta` directly from the file, not using `PageMemory`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1073650142


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -811,8 +885,8 @@ private void checkForPresenceRows(
     ) {
         for (IgniteTuple3<RowId, BinaryRow, HybridTimestamp> row : rows) {
             assertThat(
-                    getAll(mvPartitionStorage.scanVersions(row.get1())).stream().map(ReadResult::binaryRow).collect(toList()),
-                    containsInAnyOrder(row.get2())
+                    toListOfByteArrays(mvPartitionStorage.scanVersions(row.get1())),
+                    containsInAnyOrder(row.get2().bytes())

Review Comment:
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072447536


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -987,24 +909,174 @@ public boolean hasNext() {
                     ReadResult result = findLatestRowVersion(chain);
 
                     if (result.isEmpty() && !result.isWriteIntent()) {
-                        continue;
+                        return null;
                     }
 
                     nextRead = result;
                     currentChain = chain;
 
                     return true;
-                } finally {
-                    closeBusyLock.leaveBusy();
+                });
+
+                if (hasNext != null) {
+                    return hasNext;
                 }
             }
         }
     }
 
+    private class ScanVersionsCursor implements Cursor<ReadResult> {
+        final RowId rowId;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private VersionChain versionChain;
+
+        @Nullable
+        private RowVersion rowVersion;
+
+        private ScanVersionsCursor(RowId rowId) {
+            this.rowId = rowId;
+        }
+
+        @Override
+        public void close() {
+            // No-op.
+        }
+
+        @Override
+        public boolean hasNext() {
+            return busy(() -> {
+                advanceIfNeeded();
+
+                return hasNext;
+            });
+        }
+
+        @Override
+        public ReadResult next() {
+            return busy(() -> {
+                advanceIfNeeded();
+
+                if (!hasNext) {
+                    throw new NoSuchElementException();
+                }
+
+                hasNext = null;
+
+                return rowVersionToResultNotFillingLastCommittedTs(versionChain, rowVersion);
+            });
+        }
+
+        private void advanceIfNeeded() {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
+
+            if (hasNext != null) {
+                return;
+            }
+
+            if (versionChain == null) {
+                try {
+                    versionChain = versionChainTree.findOne(new VersionChainKey(rowId));
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException(e);
+                }
+
+                rowVersion = versionChain == null ? null : readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
+            } else {
+                rowVersion = !rowVersion.hasNextLink() ? null : readRowVersion(rowVersion.nextLink(), ALWAYS_LOAD_VALUE);
+            }
+
+            hasNext = rowVersion != null;
+        }
+    }
+
+    @Override
+    public void close() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+            StorageState state = this.state.get();
+
+            assert state == StorageState.CLOSED : state;
+
+            return;
+        }
+
+        busyLock.block();
+
+        versionChainTree.close();

Review Comment:
   Makes sense, I'll do it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072249794


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -105,71 +102,65 @@ public SortedIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {

Review Comment:
   Tried to fix it.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -105,71 +102,65 @@ public SortedIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
 
-        try {
-            SortedIndexRowKey lowerBound = toSortedIndexRow(key, lowestRowId);
+            try {
+                SortedIndexRowKey lowerBound = toSortedIndexRow(key, lowestRowId);
 
-            SortedIndexRowKey upperBound = toSortedIndexRow(key, highestRowId);
+                SortedIndexRowKey upperBound = toSortedIndexRow(key, highestRowId);
 
-            return convertCursor(sortedIndexTree.find(lowerBound, upperBound), SortedIndexRow::rowId);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to create scan cursor", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                return convertCursor(sortedIndexTree.find(lowerBound, upperBound), SortedIndexRow::rowId);
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to create scan cursor", e);
+            }
+        });
     }
 
     @Override
     public void put(IndexRow row) {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {

Review Comment:
   Tried to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072334426


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, partitionId);
+                VersionChainTree versionChainTree = createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId());

Review Comment:
   I think we need to fix this in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072202522


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java:
##########
@@ -268,12 +377,52 @@ private void checkPartitionId(int partitionId) {
         int partitions = mvPartitions.length();
 
         if (partitionId < 0 || partitionId >= partitions) {
-            throw new IllegalArgumentException(S.toString(
-                    "Unable to access partition with id outside of configured range",
-                    "table", tableCfg.value().name(), false,
-                    "partitionId", partitionId, false,
-                    "partitions", partitions, false
+            throw new IllegalArgumentException(IgniteStringFormatter.format(
+                    "Unable to access partition with id outside of configured range: [table={}, partitionId={}, partitions={}]",
+                    getTableName(),
+                    partitionId,
+                    partitions
             ));
         }
     }
+
+    /**
+     * Returns multi-versioned partition storage without using {@link #busyLock}.
+     *
+     * @param partitionId Partition ID.
+     * @return {@code Null} if there is no storage.
+     */
+    @Nullable
+    AbstractPageMemoryMvPartitionStorage getMvPartitionStorageWithoutBusyLock(int partitionId) {
+        checkPartitionId(partitionId);
+
+        return mvPartitions.get(partitionId);
+    }
+
+    /**
+     * Returns multi-versioned partition storage, if it doesn't exist it will throw an exception from the
+     * {@code missingStorageExceptionFunction}, without using {@link #busyLock}.
+     *
+     * @param partitionId Partition ID.
+     * @param missingStorageExceptionFunction Function to create an exception if the store is missing.
+     */
+    AbstractPageMemoryMvPartitionStorage getMvPartitionStorageWithoutBusyLock(
+            int partitionId,
+            Function<String, ? extends StorageException> missingStorageExceptionFunction
+    ) {
+        AbstractPageMemoryMvPartitionStorage mvPartitionStorage = getMvPartitionStorageWithoutBusyLock(partitionId);
+
+        if (mvPartitionStorage == null) {
+            throw missingStorageExceptionFunction.apply(IgniteStringFormatter.format("Partition ID {} does not exist", partitionId));
+        }
+
+        return mvPartitionStorage;
+    }
+
+    /**
+     * Returns table name.
+     */
+    public String getTableName() {

Review Comment:
   Agree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072414034


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, partitionId);
+                VersionChainTree versionChainTree = createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId());
 
         return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
                 .thenAccept(unused -> dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
                 .thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
     }
+
+    private GroupPartitionId createGroupPartitionId(int partitionId) {
+        return new GroupPartitionId(tableConfig.tableId().value(), partitionId);
+    }
+
+    private <V> V inCheckpointLock(Supplier<V> supplier) {
+        CheckpointTimeoutLock checkpointTimeoutLock = dataRegion.checkpointManager().checkpointTimeoutLock();
+
+        checkpointTimeoutLock.checkpointReadLock();
+
+        try {
+            return supplier.get();
+        } finally {
+            checkpointTimeoutLock.checkpointReadUnlock();
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMeta(GroupPartitionId groupPartitionId, FilePageStore filePageStore) {
+        try {
+            PartitionMeta meta = dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), groupPartitionId, filePageStore);
+
+            dataRegion.partitionMetaManager().addMeta(groupPartitionId, meta);
+
+            filePageStore.pages(meta.pageCount());
+
+            filePageStore.setPageAllocationListener(pageIdx -> {
+                assert dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread();
+
+                meta.incrementPageCount(lastCheckpointId());
+            });
+
+            return meta;
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException(
+                    IgniteStringFormatter.format(
+                            "Error reading or creating partition meta information: [table={}, partitionId={}]",
+                            getTableName(),
+                            groupPartitionId.getPartitionId()
+                    ),
+                    e
+            );
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(GroupPartitionId groupPartitionId) {

Review Comment:
   Try to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072328103


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -210,7 +177,8 @@ private FilePageStore ensurePartitionFilePageStore(TableView tableView, GroupPar
     /**
      * Returns id of the last started checkpoint, or {@code null} if no checkpoints were started yet.
      */
-    public @Nullable UUID lastCheckpointId() {
+    @Nullable
+    private UUID lastCheckpointId() {

Review Comment:
   Fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072510827


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -396,4 +372,56 @@ private int compareRows(SortedIndexRowKey key1, SortedIndexRowKey key2) {
             );
         }
     }
+
+    /**
+     * Prepares storage for rebalancing.
+     *
+     * <p>Stops ongoing index operations.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    public void startRebalance() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Stops ongoing operations on the storage.

Review Comment:
   I tried to modify it.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -203,19 +187,68 @@ public void destroy() throws StorageException {
      * Closes the hash index storage.
      */
     public void close() {
-        if (!CLOSED.compareAndSet(this, false, true)) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+            StorageState state = this.state.get();
+
+            assert state == StorageState.CLOSED : state;
+
             return;
         }
 
-        closeBusyLock.block();
+        busyLock.block();
 
         hashIndexTree.close();
     }
 
     /**
-     * Throws an exception that the storage is already closed.
+     * Prepares storage for rebalancing.
+     *
+     * <p>Stops ongoing index operations.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    public void startRebalance() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Stops ongoing operations on the storage.

Review Comment:
   I tried to modify it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072184092


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -811,8 +885,8 @@ private void checkForPresenceRows(
     ) {
         for (IgniteTuple3<RowId, BinaryRow, HybridTimestamp> row : rows) {
             assertThat(
-                    getAll(mvPartitionStorage.scanVersions(row.get1())).stream().map(ReadResult::binaryRow).collect(toList()),
-                    containsInAnyOrder(row.get2())
+                    toListOfByteArrays(mvPartitionStorage.scanVersions(row.get1())),
+                    containsInAnyOrder(row.get2().bytes())

Review Comment:
   There are no bugs, inside it is checked that if these are arrays, then it is checked by their lengths and elements.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072276069


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -800,67 +732,65 @@ public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageEx
             } else {
                 return new TimestampCursor(treeCursor, timestamp);
             }
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+        });
     }
 
     @Override
     public @Nullable RowId closestRowId(RowId lowerBound) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
 
-        try (Cursor<VersionChain> cursor = versionChainTree.find(new VersionChainKey(lowerBound), null)) {
-            return cursor.hasNext() ? cursor.next().rowId() : null;
-        } catch (Exception e) {
-            throw new StorageException("Error occurred while trying to read a row id", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+            try (Cursor<VersionChain> cursor = versionChainTree.find(new VersionChainKey(lowerBound), null)) {
+                return cursor.hasNext() ? cursor.next().rowId() : null;
+            } catch (Exception e) {
+                throw new StorageException("Error occurred while trying to read a row id", e);
+            }
+        });
     }
 
     @Override
     public long rowsCount() {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
 
-        try {
-            return versionChainTree.size();
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Error occurred while fetching the size.", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+            try {
+                return versionChainTree.size();
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Error occurred while fetching the size", e);
+            }
+        });
     }
 
     private abstract class BasePartitionTimestampCursor implements PartitionTimestampCursor {
-        protected final Cursor<VersionChain> treeCursor;
+        final Cursor<VersionChain> treeCursor;

Review Comment:
   Fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072329420


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {

Review Comment:
   It is not expected to be overridden, it overrides method `org.apache.ignite.internal.storage.pagememory.AbstractPageMemoryTableStorage#clearStorageAndUpdateDataStructures`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072480981


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -365,4 +315,70 @@ private void syncMetadataOnCheckpoint(@Nullable Executor executor) throws Ignite
             });
         }
     }
+
+    @Override
+    public void lastAppliedOnRebalance(long lastAppliedIndex, long lastAppliedTerm) throws StorageException {
+        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+
+        lastApplied0(lastAppliedIndex, lastAppliedTerm);
+
+        persistedIndex = lastAppliedIndex;
+    }
+
+    /**
+     * Updates the internal data structures of the storage and its indexes on rebalance.
+     *
+     * @param meta Partition meta.
+     * @param rowVersionFreeList Free list for {@link RowVersion}.
+     * @param indexFreeList Free list fot {@link IndexColumns}.
+     * @param versionChainTree Table tree for {@link VersionChain}.
+     * @param indexMetaTree Tree that contains SQL indexes' metadata.
+     * @throws StorageRebalanceException If the storage is not in the process of rebalancing.
+     */
+    public void updateDataStructuresOnRebalance(
+            PartitionMeta meta,
+            RowVersionFreeList rowVersionFreeList,
+            IndexColumnsFreeList indexFreeList,
+            VersionChainTree versionChainTree,
+            IndexMetaTree indexMetaTree
+    ) {
+        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+
+        this.meta = meta;
+
+        this.rowVersionFreeList.close();

Review Comment:
   I'll try to close them at the beginning of the rebalancing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072329420


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {

Review Comment:
   It is not expected to be overridden.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072141988


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -97,100 +93,88 @@ public HashIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {

Review Comment:
   Why did you have to do this? These are changes for the sake of changes. Did you want to artificially inflate your PR? Well, you succeeded in that case. Please roll it back or at least create a new "get*" method that would reduce the amount of unnecessary changes.
   I feel like half of all changes in this PR are unjustified. I don't like it. Reviewing PRs like this one is a nightmare



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill merged pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill merged PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1073651359


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {

Review Comment:
   I meant the method of the base class



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId));

Review Comment:
   Extract method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072185555


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java:
##########
@@ -18,61 +18,89 @@
 package org.apache.ignite.internal.storage.pagememory;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.Function;
 import org.apache.ignite.internal.pagememory.DataRegion;
 import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.freelist.FreeList;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage;
-import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteStringFormatter;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Abstract table storage implementation based on {@link PageMemory}.
  */
 public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
-    protected final TableConfiguration tableCfg;
+    protected static final VarHandle CLOSED;
 
-    protected TablesConfiguration tablesConfiguration;
+    static {
+        try {
+            CLOSED = MethodHandles.lookup().findVarHandle(AbstractPageMemoryTableStorage.class, "closed", boolean.class);
+        } catch (ReflectiveOperationException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    protected final TableConfiguration tableConfig;
 
-    protected volatile boolean started;
+    protected final TablesConfiguration tablesConfig;
 
     protected volatile AtomicReferenceArray<AbstractPageMemoryMvPartitionStorage> mvPartitions;
 
-    protected final ConcurrentMap<Integer, CompletableFuture<Void>> partitionIdDestroyFutureMap = new ConcurrentHashMap<>();
+    protected final ConcurrentMap<Integer, CompletableFuture<Void>> destroyFutureByPartitionId = new ConcurrentHashMap<>();
+
+    protected final ConcurrentMap<Integer, CompletableFuture<Void>> rebalanceFutureByPartitionId = new ConcurrentHashMap<>();
+
+    /** Busy lock to stop synchronously. */
+    protected final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** To avoid double closure. */
+    @SuppressWarnings("unused")
+    protected volatile boolean closed;

Review Comment:
   Returned it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072257607


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -298,76 +276,74 @@ public void close() {
 
         @Override
         public boolean hasNext() {
-            if (!closeBusyLock.enterBusy()) {
-                throwStorageClosedException();
-            }
-
-            try {
-                advanceIfNeeded();
+            return busy(() -> {
+                try {
+                    advanceIfNeeded();
 
-                return hasNext;
-            } catch (IgniteInternalCheckedException e) {
-                throw new StorageException("Error while advancing the cursor", e);
-            } finally {
-                closeBusyLock.leaveBusy();
-            }
+                    return hasNext;
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error while advancing the cursor", e);
+                }
+            });
         }
 
         @Override
         public IndexRow next() {
-            if (!closeBusyLock.enterBusy()) {
-                throwStorageClosedException();
-            }
-
-            try {
-                advanceIfNeeded();
+            return busy(() -> {
+                try {
+                    advanceIfNeeded();
 
-                boolean hasNext = this.hasNext;
+                    boolean hasNext = this.hasNext;
 
-                if (!hasNext) {
-                    throw new NoSuchElementException();
-                }
+                    if (!hasNext) {
+                        throw new NoSuchElementException();
+                    }
 
-                this.hasNext = null;
+                    this.hasNext = null;
 
-                return toIndexRowImpl(treeRow);
-            } catch (IgniteInternalCheckedException e) {
-                throw new StorageException("Error while advancing the cursor", e);
-            } finally {
-                closeBusyLock.leaveBusy();
-            }
+                    return toIndexRowImpl(treeRow);
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error while advancing the cursor", e);
+                }
+            });
         }
 
         @Override
         public @Nullable IndexRow peek() {
-            if (hasNext != null) {
-                if (hasNext) {
-                    return toIndexRowImpl(treeRow);
+            return busy(() -> {

Review Comment:
   Same, but why not use a generic template for this class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072251321


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -247,28 +233,20 @@ public void close() {
 
             @Override
             public boolean hasNext() {
-                if (!closeBusyLock.enterBusy()) {
-                    throwStorageClosedException();
-                }
+                return busy(() -> {
+                    throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemorySortedIndexStorage.this::createStorageInfo);
 
-                try {
                     return cursor.hasNext();
-                } finally {
-                    closeBusyLock.leaveBusy();
-                }
+                });

Review Comment:
   Same, but why not use a generic template for this class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072580281


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageUtils.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import java.util.function.Supplier;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteStringFormatter;
+
+/**
+ * Helper class for {@link PageMemory}-based storages.
+ */
+public class PageMemoryStorageUtils {
+    /**
+     * Runs a function under a busyLock, if it was not possible to acquire(busy) busyLock throws an exception depending on
+     * {@link StorageState}.
+     *
+     * @param <V> Type of the returned value.
+     * @param busyLock Busy lock.
+     * @param supplier Function.
+     * @param storageInfoSupplier Storage state supplier.
+     * @param storageStateSupplier Storage information supplier, for example in the format "table=user, partitionId=1".
+     * @return Value.
+     * @throws StorageClosedException If the storage is closed.
+     * @throws StorageRebalanceException If storage is in the process of rebalancing.
+     * @throws StorageException For other {@link StorageState}.
+     */
+    public static <V> V inBusyLock(
+            IgniteSpinBusyLock busyLock,
+            Supplier<V> supplier,
+            Supplier<StorageState> storageStateSupplier,

Review Comment:
   I tried to change a little.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072431782


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, partitionId);
+                VersionChainTree versionChainTree = createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId());
 
         return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
                 .thenAccept(unused -> dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
                 .thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
     }
+
+    private GroupPartitionId createGroupPartitionId(int partitionId) {
+        return new GroupPartitionId(tableConfig.tableId().value(), partitionId);
+    }
+
+    private <V> V inCheckpointLock(Supplier<V> supplier) {
+        CheckpointTimeoutLock checkpointTimeoutLock = dataRegion.checkpointManager().checkpointTimeoutLock();
+
+        checkpointTimeoutLock.checkpointReadLock();
+
+        try {
+            return supplier.get();
+        } finally {
+            checkpointTimeoutLock.checkpointReadUnlock();
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMeta(GroupPartitionId groupPartitionId, FilePageStore filePageStore) {
+        try {
+            PartitionMeta meta = dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), groupPartitionId, filePageStore);
+
+            dataRegion.partitionMetaManager().addMeta(groupPartitionId, meta);
+
+            filePageStore.pages(meta.pageCount());
+
+            filePageStore.setPageAllocationListener(pageIdx -> {
+                assert dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread();
+
+                meta.incrementPageCount(lastCheckpointId());
+            });
+
+            return meta;
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException(
+                    IgniteStringFormatter.format(
+                            "Error reading or creating partition meta information: [table={}, partitionId={}]",
+                            getTableName(),
+                            groupPartitionId.getPartitionId()
+                    ),
+                    e
+            );
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(GroupPartitionId groupPartitionId) {
+        TableView tableView = tableConfig.value();
+
+        FilePageStore filePageStore = ensurePartitionFilePageStore(tableView, groupPartitionId);
+
+        PartitionMeta partitionMeta = getOrCreatePartitionMeta(groupPartitionId, filePageStore);
+
+        if (partitionMeta.lastAppliedIndex() == REBALANCE_IN_PROGRESS) {
+            try {
+                // Time is chosen randomly (long enough) so as not to call #join().
+                destroyPartitionPhysically(groupPartitionId).get(10, TimeUnit.SECONDS);

Review Comment:
   Added TODO.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, partitionId);
+                VersionChainTree versionChainTree = createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId());
 
         return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
                 .thenAccept(unused -> dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
                 .thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
     }
+
+    private GroupPartitionId createGroupPartitionId(int partitionId) {
+        return new GroupPartitionId(tableConfig.tableId().value(), partitionId);
+    }
+
+    private <V> V inCheckpointLock(Supplier<V> supplier) {
+        CheckpointTimeoutLock checkpointTimeoutLock = dataRegion.checkpointManager().checkpointTimeoutLock();
+
+        checkpointTimeoutLock.checkpointReadLock();
+
+        try {
+            return supplier.get();
+        } finally {
+            checkpointTimeoutLock.checkpointReadUnlock();
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMeta(GroupPartitionId groupPartitionId, FilePageStore filePageStore) {
+        try {
+            PartitionMeta meta = dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), groupPartitionId, filePageStore);
+
+            dataRegion.partitionMetaManager().addMeta(groupPartitionId, meta);
+
+            filePageStore.pages(meta.pageCount());
+
+            filePageStore.setPageAllocationListener(pageIdx -> {
+                assert dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread();
+
+                meta.incrementPageCount(lastCheckpointId());
+            });
+
+            return meta;
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException(
+                    IgniteStringFormatter.format(
+                            "Error reading or creating partition meta information: [table={}, partitionId={}]",
+                            getTableName(),
+                            groupPartitionId.getPartitionId()
+                    ),
+                    e
+            );
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(GroupPartitionId groupPartitionId) {
+        TableView tableView = tableConfig.value();
+
+        FilePageStore filePageStore = ensurePartitionFilePageStore(tableView, groupPartitionId);
+
+        PartitionMeta partitionMeta = getOrCreatePartitionMeta(groupPartitionId, filePageStore);
+
+        if (partitionMeta.lastAppliedIndex() == REBALANCE_IN_PROGRESS) {
+            try {
+                // Time is chosen randomly (long enough) so as not to call #join().
+                destroyPartitionPhysically(groupPartitionId).get(10, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                throw new StorageException(
+                        IgniteStringFormatter.format(
+                                "Error when physically destroying a partition: [table={}, partitionId={}]",
+                                getTableName(),
+                                groupPartitionId.getPartitionId()
+                        ),
+                        e
+                );
+            }
+
+            return getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId));
+        } else {
+            return partitionMeta;
+        }
+    }
+
+    private void waitPartitionToBeDestroyed(int partitionId) {
+        CompletableFuture<Void> partitionDestroyFuture = destroyFutureByPartitionId.get(partitionId);
+
+        if (partitionDestroyFuture != null) {
+            try {
+                // Time is chosen randomly (long enough) so as not to call #join().
+                partitionDestroyFuture.get(10, TimeUnit.SECONDS);

Review Comment:
   Added TODO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072512178


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, partitionId);
+                VersionChainTree versionChainTree = createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId());

Review Comment:
   Created a ticket: https://issues.apache.org/jira/browse/IGNITE-18565



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072225481


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java:
##########
@@ -268,12 +377,52 @@ private void checkPartitionId(int partitionId) {
         int partitions = mvPartitions.length();
 
         if (partitionId < 0 || partitionId >= partitions) {
-            throw new IllegalArgumentException(S.toString(
-                    "Unable to access partition with id outside of configured range",
-                    "table", tableCfg.value().name(), false,
-                    "partitionId", partitionId, false,
-                    "partitions", partitions, false
+            throw new IllegalArgumentException(IgniteStringFormatter.format(
+                    "Unable to access partition with id outside of configured range: [table={}, partitionId={}, partitions={}]",
+                    getTableName(),
+                    partitionId,
+                    partitions
             ));
         }
     }
+
+    /**
+     * Returns multi-versioned partition storage without using {@link #busyLock}.
+     *
+     * @param partitionId Partition ID.
+     * @return {@code Null} if there is no storage.
+     */
+    @Nullable
+    AbstractPageMemoryMvPartitionStorage getMvPartitionStorageWithoutBusyLock(int partitionId) {
+        checkPartitionId(partitionId);
+
+        return mvPartitions.get(partitionId);
+    }
+
+    /**
+     * Returns multi-versioned partition storage, if it doesn't exist it will throw an exception from the
+     * {@code missingStorageExceptionFunction}, without using {@link #busyLock}.
+     *
+     * @param partitionId Partition ID.
+     * @param missingStorageExceptionFunction Function to create an exception if the store is missing.
+     */
+    AbstractPageMemoryMvPartitionStorage getMvPartitionStorageWithoutBusyLock(

Review Comment:
   Replaced with `null` check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072181314


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -548,6 +573,51 @@ public void testDestroyTableStorage() throws Exception {
         assertThat(tableStorage.destroy(), willCompleteSuccessfully());
     }
 
+    /**
+     * Checks that if we restart the storages after a crash in the middle of a rebalance, the storages will be empty.
+     */
+    @Test
+    public void testRestartStoragesAfterFailOnMiddleOfRebalance() {

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072257176


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -298,76 +276,74 @@ public void close() {
 
         @Override
         public boolean hasNext() {
-            if (!closeBusyLock.enterBusy()) {
-                throwStorageClosedException();
-            }
-
-            try {
-                advanceIfNeeded();
+            return busy(() -> {

Review Comment:
   Same, but why not use a generic template for this class?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -298,76 +276,74 @@ public void close() {
 
         @Override
         public boolean hasNext() {
-            if (!closeBusyLock.enterBusy()) {
-                throwStorageClosedException();
-            }
-
-            try {
-                advanceIfNeeded();
+            return busy(() -> {
+                try {
+                    advanceIfNeeded();
 
-                return hasNext;
-            } catch (IgniteInternalCheckedException e) {
-                throw new StorageException("Error while advancing the cursor", e);
-            } finally {
-                closeBusyLock.leaveBusy();
-            }
+                    return hasNext;
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error while advancing the cursor", e);
+                }
+            });
         }
 
         @Override
         public IndexRow next() {
-            if (!closeBusyLock.enterBusy()) {
-                throwStorageClosedException();
-            }
-
-            try {
-                advanceIfNeeded();
+            return busy(() -> {

Review Comment:
   Same, but why not use a generic template for this class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072470190


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -203,19 +187,68 @@ public void destroy() throws StorageException {
      * Closes the hash index storage.
      */
     public void close() {
-        if (!CLOSED.compareAndSet(this, false, true)) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+            StorageState state = this.state.get();
+
+            assert state == StorageState.CLOSED : state;
+
             return;
         }
 
-        closeBusyLock.block();
+        busyLock.block();
 
         hashIndexTree.close();
     }
 
     /**
-     * Throws an exception that the storage is already closed.
+     * Prepares storage for rebalancing.
+     *
+     * <p>Stops ongoing index operations.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    public void startRebalance() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Stops ongoing operations on the storage.
+        busyLock.block();
+        busyLock.unblock();
+    }
+
+    /**
+     * Completes the rebalancing of the storage.
+     *
+     * @throws StorageRebalanceException If there is an error while completing the storage rebalance.
      */
-    private void throwStorageClosedException() {
-        throw new StorageClosedException();
+    public void completeRebalance() {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+    }
+
+    /**
+     * Updates the internal data structures of the storage on rebalance.
+     *
+     * @param freeList Free list to store index columns.
+     * @param hashIndexTree Hash index tree instance.
+     * @throws StorageRebalanceException If the storage is not in the process of rebalancing.
+     */
+    public void updateDataStructuresOnRebalance(IndexColumnsFreeList freeList, HashIndexTree hashIndexTree) {
+        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+
+        this.freeList = freeList;
+
+        this.hashIndexTree.close();

Review Comment:
   I'll try to close them at the beginning of the rebalancing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072480105


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -396,4 +372,56 @@ private int compareRows(SortedIndexRowKey key1, SortedIndexRowKey key2) {
             );
         }
     }
+
+    /**
+     * Prepares storage for rebalancing.
+     *
+     * <p>Stops ongoing index operations.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    public void startRebalance() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Stops ongoing operations on the storage.
+        busyLock.block();
+        busyLock.unblock();
+    }
+
+    /**
+     * Completes the rebalancing of the storage.
+     *
+     * @throws StorageRebalanceException If there is an error while completing the storage rebalance.
+     */
+    public void completeRebalance() {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+    }
+
+    /**
+     * Updates the internal data structures of the storage on rebalance.
+     *
+     * @param freeList Free list to store index columns.
+     * @param sortedIndexTree Sorted index tree instance.
+     * @throws StorageRebalanceException If the storage is not in the process of rebalancing.
+     */
+    public void updateDataStructuresOnRebalance(IndexColumnsFreeList freeList, SortedIndexTree sortedIndexTree) {
+        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+
+        this.freeList = freeList;
+
+        this.sortedIndexTree.close();

Review Comment:
   I'll try to close them at the beginning of the rebalancing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1071208048


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -548,6 +573,51 @@ public void testDestroyTableStorage() throws Exception {
         assertThat(tableStorage.destroy(), willCompleteSuccessfully());
     }
 
+    /**
+     * Checks that if we restart the storages after a crash in the middle of a rebalance, the storages will be empty.
+     */
+    @Test
+    public void testRestartStoragesAfterFailOnMiddleOfRebalance() {

Review Comment:
   ```suggestion
       public void testRestartStoragesAfterFailDuringRebalance() {
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java:
##########
@@ -18,61 +18,89 @@
 package org.apache.ignite.internal.storage.pagememory;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.Function;
 import org.apache.ignite.internal.pagememory.DataRegion;
 import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.freelist.FreeList;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage;
-import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteStringFormatter;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Abstract table storage implementation based on {@link PageMemory}.
  */
 public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
-    protected final TableConfiguration tableCfg;
+    protected static final VarHandle CLOSED;
 
-    protected TablesConfiguration tablesConfiguration;
+    static {
+        try {
+            CLOSED = MethodHandles.lookup().findVarHandle(AbstractPageMemoryTableStorage.class, "closed", boolean.class);
+        } catch (ReflectiveOperationException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    protected final TableConfiguration tableConfig;
 
-    protected volatile boolean started;
+    protected final TablesConfiguration tablesConfig;
 
     protected volatile AtomicReferenceArray<AbstractPageMemoryMvPartitionStorage> mvPartitions;
 
-    protected final ConcurrentMap<Integer, CompletableFuture<Void>> partitionIdDestroyFutureMap = new ConcurrentHashMap<>();
+    protected final ConcurrentMap<Integer, CompletableFuture<Void>> destroyFutureByPartitionId = new ConcurrentHashMap<>();
+
+    protected final ConcurrentMap<Integer, CompletableFuture<Void>> rebalanceFutureByPartitionId = new ConcurrentHashMap<>();
+
+    /** Busy lock to stop synchronously. */
+    protected final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** To avoid double closure. */
+    @SuppressWarnings("unused")
+    protected volatile boolean closed;

Review Comment:
   Almost everyone else calls it `stopGuard`. Why do you break the pattern?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java:
##########
@@ -268,12 +377,52 @@ private void checkPartitionId(int partitionId) {
         int partitions = mvPartitions.length();
 
         if (partitionId < 0 || partitionId >= partitions) {
-            throw new IllegalArgumentException(S.toString(
-                    "Unable to access partition with id outside of configured range",
-                    "table", tableCfg.value().name(), false,
-                    "partitionId", partitionId, false,
-                    "partitions", partitions, false
+            throw new IllegalArgumentException(IgniteStringFormatter.format(
+                    "Unable to access partition with id outside of configured range: [table={}, partitionId={}, partitions={}]",
+                    getTableName(),
+                    partitionId,
+                    partitions
             ));
         }
     }
+
+    /**
+     * Returns multi-versioned partition storage without using {@link #busyLock}.
+     *
+     * @param partitionId Partition ID.
+     * @return {@code Null} if there is no storage.
+     */
+    @Nullable
+    AbstractPageMemoryMvPartitionStorage getMvPartitionStorageWithoutBusyLock(int partitionId) {
+        checkPartitionId(partitionId);
+
+        return mvPartitions.get(partitionId);
+    }
+
+    /**
+     * Returns multi-versioned partition storage, if it doesn't exist it will throw an exception from the
+     * {@code missingStorageExceptionFunction}, without using {@link #busyLock}.
+     *
+     * @param partitionId Partition ID.
+     * @param missingStorageExceptionFunction Function to create an exception if the store is missing.
+     */
+    AbstractPageMemoryMvPartitionStorage getMvPartitionStorageWithoutBusyLock(

Review Comment:
   Can it simply return null? Why do you need to provide exception generation function?
   Another option is to use `Optional` with its `orElseThrow`, for example, if you want to put everything into a single statement.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java:
##########
@@ -268,12 +377,52 @@ private void checkPartitionId(int partitionId) {
         int partitions = mvPartitions.length();
 
         if (partitionId < 0 || partitionId >= partitions) {
-            throw new IllegalArgumentException(S.toString(
-                    "Unable to access partition with id outside of configured range",
-                    "table", tableCfg.value().name(), false,
-                    "partitionId", partitionId, false,
-                    "partitions", partitions, false
+            throw new IllegalArgumentException(IgniteStringFormatter.format(
+                    "Unable to access partition with id outside of configured range: [table={}, partitionId={}, partitions={}]",
+                    getTableName(),
+                    partitionId,
+                    partitions
             ));
         }
     }
+
+    /**
+     * Returns multi-versioned partition storage without using {@link #busyLock}.
+     *
+     * @param partitionId Partition ID.
+     * @return {@code Null} if there is no storage.
+     */
+    @Nullable
+    AbstractPageMemoryMvPartitionStorage getMvPartitionStorageWithoutBusyLock(int partitionId) {
+        checkPartitionId(partitionId);
+
+        return mvPartitions.get(partitionId);
+    }
+
+    /**
+     * Returns multi-versioned partition storage, if it doesn't exist it will throw an exception from the
+     * {@code missingStorageExceptionFunction}, without using {@link #busyLock}.
+     *
+     * @param partitionId Partition ID.
+     * @param missingStorageExceptionFunction Function to create an exception if the store is missing.
+     */
+    AbstractPageMemoryMvPartitionStorage getMvPartitionStorageWithoutBusyLock(
+            int partitionId,
+            Function<String, ? extends StorageException> missingStorageExceptionFunction
+    ) {
+        AbstractPageMemoryMvPartitionStorage mvPartitionStorage = getMvPartitionStorageWithoutBusyLock(partitionId);
+
+        if (mvPartitionStorage == null) {
+            throw missingStorageExceptionFunction.apply(IgniteStringFormatter.format("Partition ID {} does not exist", partitionId));
+        }
+
+        return mvPartitionStorage;
+    }
+
+    /**
+     * Returns table name.
+     */
+    public String getTableName() {

Review Comment:
   Offtopic. Given that tables can be renamed, we should figure out a way to print logs "consistently". But that's for the future



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java:
##########
@@ -167,86 +203,79 @@ public CompletableFuture<Void> destroy() {
 
     @Override
     public AbstractPageMemoryMvPartitionStorage getOrCreateMvPartition(int partitionId) throws StorageException {
-        AbstractPageMemoryMvPartitionStorage partition = getMvPartition(partitionId);
+        return inBusyLock(busyLock, () -> {
+            AbstractPageMemoryMvPartitionStorage partition = getMvPartitionStorageWithoutBusyLock(partitionId);
 
-        if (partition != null) {
-            return partition;
-        }
+            if (partition != null) {
+                return partition;
+            }
 
-        partition = createMvPartitionStorage(partitionId);
+            partition = createMvPartitionStorage(partitionId);
 
-        partition.start();
+            partition.start();
 
-        mvPartitions.set(partitionId, partition);
+            mvPartitions.set(partitionId, partition);
 
-        return partition;
+            return partition;
+        });
     }
 
     @Override
     public @Nullable AbstractPageMemoryMvPartitionStorage getMvPartition(int partitionId) {
-        assert started : "Storage has not started yet";
-
-        checkPartitionId(partitionId);
-
-        return mvPartitions.get(partitionId);
+        return inBusyLock(busyLock, () -> getMvPartitionStorageWithoutBusyLock(partitionId));
     }
 
     @Override
     public CompletableFuture<Void> destroyPartition(int partitionId) {
-        assert started : "Storage has not started yet";
+        return inBusyLock(busyLock, () -> {

Review Comment:
   I would recommend extracting such big closures into methods like "destroyPartitionBusy", this would make PR smaller and simplify the review. Padding will become smaller as well, that's always a good thing. Right now it's hard to tell whether you just re-formatted the code or changed something in it while doing it.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId));

Review Comment:
   So, it's a copy-paste after all. Why?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -96,49 +98,22 @@ public boolean isVolatile() {
 
     @Override
     protected void finishDestruction() {
-        dataRegion.pageMemory().onGroupDestroyed(tableCfg.tableId().value());
+        dataRegion.pageMemory().onGroupDestroyed(tableConfig.tableId().value());
     }
 
     @Override
     public PersistentPageMemoryMvPartitionStorage createMvPartitionStorage(int partitionId) {
-        CompletableFuture<Void> partitionDestroyFuture = partitionIdDestroyFutureMap.get(partitionId);
+        waitPartitionToBeDestroyed(partitionId);
 
-        if (partitionDestroyFuture != null) {
-            try {
-                // Time is chosen randomly (long enough) so as not to call #join().
-                partitionDestroyFuture.get(10, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                throw new StorageException("Error waiting for the destruction of the previous version of the partition: " + partitionId, e);
-            }
-        }
+        TableView tableView = tableConfig.value();
 
-        TableView tableView = tableCfg.value();
+        GroupPartitionId groupPartitionId = createGroupPartitionId(partitionId);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableView.tableId(), partitionId);
+        PartitionMeta meta = getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(groupPartitionId);

Review Comment:
   Why did you separated it into a different checkpoint lock section? Won't that break things?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -210,7 +177,8 @@ private FilePageStore ensurePartitionFilePageStore(TableView tableView, GroupPar
     /**
      * Returns id of the last started checkpoint, or {@code null} if no checkpoints were started yet.
      */
-    public @Nullable UUID lastCheckpointId() {
+    @Nullable
+    private UUID lastCheckpointId() {

Review Comment:
   Ok, now I'm confused! You always attributed Nullable annotation to types, not methods, and now you change it back. Why? I don't get it. Is this stated in code style guidelines and you just have to change it? I don't think so



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, partitionId);
+                VersionChainTree versionChainTree = createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId());
 
         return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
                 .thenAccept(unused -> dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
                 .thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
     }
+
+    private GroupPartitionId createGroupPartitionId(int partitionId) {
+        return new GroupPartitionId(tableConfig.tableId().value(), partitionId);
+    }
+
+    private <V> V inCheckpointLock(Supplier<V> supplier) {
+        CheckpointTimeoutLock checkpointTimeoutLock = dataRegion.checkpointManager().checkpointTimeoutLock();
+
+        checkpointTimeoutLock.checkpointReadLock();
+
+        try {
+            return supplier.get();
+        } finally {
+            checkpointTimeoutLock.checkpointReadUnlock();
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMeta(GroupPartitionId groupPartitionId, FilePageStore filePageStore) {
+        try {
+            PartitionMeta meta = dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), groupPartitionId, filePageStore);
+
+            dataRegion.partitionMetaManager().addMeta(groupPartitionId, meta);
+
+            filePageStore.pages(meta.pageCount());
+
+            filePageStore.setPageAllocationListener(pageIdx -> {
+                assert dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread();
+
+                meta.incrementPageCount(lastCheckpointId());
+            });
+
+            return meta;
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException(
+                    IgniteStringFormatter.format(
+                            "Error reading or creating partition meta information: [table={}, partitionId={}]",
+                            getTableName(),
+                            groupPartitionId.getPartitionId()
+                    ),
+                    e
+            );
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(GroupPartitionId groupPartitionId) {
+        TableView tableView = tableConfig.value();
+
+        FilePageStore filePageStore = ensurePartitionFilePageStore(tableView, groupPartitionId);
+
+        PartitionMeta partitionMeta = getOrCreatePartitionMeta(groupPartitionId, filePageStore);
+
+        if (partitionMeta.lastAppliedIndex() == REBALANCE_IN_PROGRESS) {
+            try {
+                // Time is chosen randomly (long enough) so as not to call #join().
+                destroyPartitionPhysically(groupPartitionId).get(10, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                throw new StorageException(
+                        IgniteStringFormatter.format(
+                                "Error when physically destroying a partition: [table={}, partitionId={}]",
+                                getTableName(),
+                                groupPartitionId.getPartitionId()
+                        ),
+                        e
+                );
+            }
+
+            return getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId));
+        } else {
+            return partitionMeta;
+        }
+    }
+
+    private void waitPartitionToBeDestroyed(int partitionId) {
+        CompletableFuture<Void> partitionDestroyFuture = destroyFutureByPartitionId.get(partitionId);
+
+        if (partitionDestroyFuture != null) {
+            try {
+                // Time is chosen randomly (long enough) so as not to call #join().
+                partitionDestroyFuture.get(10, TimeUnit.SECONDS);

Review Comment:
   Same thing here.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -97,100 +93,88 @@ public HashIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
 
-        try {
-            IndexColumns indexColumns = new IndexColumns(partitionId, key.byteBuffer());
+            try {
+                IndexColumns indexColumns = new IndexColumns(partitionId, key.byteBuffer());
 
-            HashIndexRow lowerBound = new HashIndexRow(indexColumns, lowestRowId);
-            HashIndexRow upperBound = new HashIndexRow(indexColumns, highestRowId);
+                HashIndexRow lowerBound = new HashIndexRow(indexColumns, lowestRowId);
+                HashIndexRow upperBound = new HashIndexRow(indexColumns, highestRowId);
 
-            Cursor<HashIndexRow> cursor = hashIndexTree.find(lowerBound, upperBound);
+                Cursor<HashIndexRow> cursor = hashIndexTree.find(lowerBound, upperBound);
 
-            return new Cursor<>() {
-                @Override
-                public void close() {
-                    cursor.close();
-                }
-
-                @Override
-                public boolean hasNext() {
-                    if (!closeBusyLock.enterBusy()) {
-                        throwStorageClosedException();
+                return new Cursor<RowId>() {
+                    @Override
+                    public void close() {
+                        cursor.close();
                     }
 
-                    try {
-                        return cursor.hasNext();
-                    } finally {
-                        closeBusyLock.leaveBusy();
-                    }
-                }
+                    @Override
+                    public boolean hasNext() {
+                        return busy(() -> {
+                            throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemoryHashIndexStorage.this::createStorageInfo);
 
-                @Override
-                public RowId next() {
-                    if (!closeBusyLock.enterBusy()) {
-                        throwStorageClosedException();
+                            return cursor.hasNext();
+                        });
                     }
 
-                    try {
-                        return cursor.next().rowId();
-                    } finally {
-                        closeBusyLock.leaveBusy();
+                    @Override
+                    public RowId next() {
+                        return busy(() -> {
+                            throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemoryHashIndexStorage.this::createStorageInfo);
+
+                            return cursor.next().rowId();
+                        });
                     }
-                }
-            };
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to create scan cursor", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                };
+            } catch (Throwable e) {
+                throw new StorageException("Failed to create scan cursor", e);
+            }
+        });
     }
 
     @Override
     public void put(IndexRow row) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {

Review Comment:
   Same here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -203,19 +187,68 @@ public void destroy() throws StorageException {
      * Closes the hash index storage.
      */
     public void close() {
-        if (!CLOSED.compareAndSet(this, false, true)) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+            StorageState state = this.state.get();
+
+            assert state == StorageState.CLOSED : state;
+
             return;
         }
 
-        closeBusyLock.block();
+        busyLock.block();
 
         hashIndexTree.close();
     }
 
     /**
-     * Throws an exception that the storage is already closed.
+     * Prepares storage for rebalancing.
+     *
+     * <p>Stops ongoing index operations.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    public void startRebalance() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Stops ongoing operations on the storage.

Review Comment:
   This comment is not enough



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -97,100 +93,88 @@ public HashIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
 
-        try {
-            IndexColumns indexColumns = new IndexColumns(partitionId, key.byteBuffer());
+            try {
+                IndexColumns indexColumns = new IndexColumns(partitionId, key.byteBuffer());
 
-            HashIndexRow lowerBound = new HashIndexRow(indexColumns, lowestRowId);
-            HashIndexRow upperBound = new HashIndexRow(indexColumns, highestRowId);
+                HashIndexRow lowerBound = new HashIndexRow(indexColumns, lowestRowId);
+                HashIndexRow upperBound = new HashIndexRow(indexColumns, highestRowId);
 
-            Cursor<HashIndexRow> cursor = hashIndexTree.find(lowerBound, upperBound);
+                Cursor<HashIndexRow> cursor = hashIndexTree.find(lowerBound, upperBound);
 
-            return new Cursor<>() {
-                @Override
-                public void close() {
-                    cursor.close();
-                }
-
-                @Override
-                public boolean hasNext() {
-                    if (!closeBusyLock.enterBusy()) {
-                        throwStorageClosedException();
+                return new Cursor<RowId>() {
+                    @Override
+                    public void close() {
+                        cursor.close();
                     }
 
-                    try {
-                        return cursor.hasNext();
-                    } finally {
-                        closeBusyLock.leaveBusy();
-                    }
-                }
+                    @Override
+                    public boolean hasNext() {
+                        return busy(() -> {
+                            throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemoryHashIndexStorage.this::createStorageInfo);
 
-                @Override
-                public RowId next() {
-                    if (!closeBusyLock.enterBusy()) {
-                        throwStorageClosedException();
+                            return cursor.hasNext();
+                        });
                     }
 
-                    try {
-                        return cursor.next().rowId();
-                    } finally {
-                        closeBusyLock.leaveBusy();
+                    @Override
+                    public RowId next() {
+                        return busy(() -> {
+                            throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemoryHashIndexStorage.this::createStorageInfo);
+
+                            return cursor.next().rowId();
+                        });
                     }
-                }
-            };
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to create scan cursor", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                };
+            } catch (Throwable e) {
+                throw new StorageException("Failed to create scan cursor", e);
+            }
+        });
     }
 
     @Override
     public void put(IndexRow row) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {
+            try {
+                IndexColumns indexColumns = new IndexColumns(partitionId, row.indexColumns().byteBuffer());
 
-        try {
-            IndexColumns indexColumns = new IndexColumns(partitionId, row.indexColumns().byteBuffer());
+                HashIndexRow hashIndexRow = new HashIndexRow(indexColumns, row.rowId());
 
-            HashIndexRow hashIndexRow = new HashIndexRow(indexColumns, row.rowId());
+                var insert = new InsertHashIndexRowInvokeClosure(hashIndexRow, freeList, hashIndexTree.inlineSize());
 
-            var insert = new InsertHashIndexRowInvokeClosure(hashIndexRow, freeList, hashIndexTree.inlineSize());
+                hashIndexTree.invoke(hashIndexRow, null, insert);
 
-            hashIndexTree.invoke(hashIndexRow, null, insert);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to put value into index", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                return null;
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to put value into index", e);
+            }
+        });
     }
 
     @Override
     public void remove(IndexRow row) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {

Review Comment:
   Same here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -105,71 +102,65 @@ public SortedIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
 
-        try {
-            SortedIndexRowKey lowerBound = toSortedIndexRow(key, lowestRowId);
+            try {
+                SortedIndexRowKey lowerBound = toSortedIndexRow(key, lowestRowId);
 
-            SortedIndexRowKey upperBound = toSortedIndexRow(key, highestRowId);
+                SortedIndexRowKey upperBound = toSortedIndexRow(key, highestRowId);
 
-            return convertCursor(sortedIndexTree.find(lowerBound, upperBound), SortedIndexRow::rowId);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to create scan cursor", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                return convertCursor(sortedIndexTree.find(lowerBound, upperBound), SortedIndexRow::rowId);
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to create scan cursor", e);
+            }
+        });
     }
 
     @Override
     public void put(IndexRow row) {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {

Review Comment:
   Same here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java:
##########
@@ -18,61 +18,89 @@
 package org.apache.ignite.internal.storage.pagememory;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.Function;
 import org.apache.ignite.internal.pagememory.DataRegion;
 import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.freelist.FreeList;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage;
-import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteStringFormatter;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Abstract table storage implementation based on {@link PageMemory}.
  */
 public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
-    protected final TableConfiguration tableCfg;
+    protected static final VarHandle CLOSED;

Review Comment:
   AtomicBoolean maybe? This is not a hot place and memory footprint is not that big, we have other places to worry about



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageUtils.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import java.util.function.Supplier;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteStringFormatter;
+
+/**
+ * Helper class for {@link PageMemory}-based storages.
+ */
+public class PageMemoryStorageUtils {
+    /**
+     * Runs a function under a busyLock, if it was not possible to acquire(busy) busyLock throws an exception depending on
+     * {@link StorageState}.
+     *
+     * @param <V> Type of the returned value.
+     * @param busyLock Busy lock.
+     * @param supplier Function.
+     * @param storageInfoSupplier Storage state supplier.
+     * @param storageStateSupplier Storage information supplier, for example in the format "table=user, partitionId=1".
+     * @return Value.
+     * @throws StorageClosedException If the storage is closed.
+     * @throws StorageRebalanceException If storage is in the process of rebalancing.
+     * @throws StorageException For other {@link StorageState}.
+     */
+    public static <V> V inBusyLock(
+            IgniteSpinBusyLock busyLock,
+            Supplier<V> supplier,
+            Supplier<StorageState> storageStateSupplier,

Review Comment:
   Why can't we pass a state itself?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -811,8 +885,8 @@ private void checkForPresenceRows(
     ) {
         for (IgniteTuple3<RowId, BinaryRow, HybridTimestamp> row : rows) {
             assertThat(
-                    getAll(mvPartitionStorage.scanVersions(row.get1())).stream().map(ReadResult::binaryRow).collect(toList()),
-                    containsInAnyOrder(row.get2())
+                    toListOfByteArrays(mvPartitionStorage.scanVersions(row.get1())),
+                    containsInAnyOrder(row.get2().bytes())

Review Comment:
   I believe that you may have a bug here. `byte[]` cannot be compared with `equals`, so this assertion is very suspicious



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -88,25 +92,46 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
     /** Partition id for 1 storage. */
     protected static final int PARTITION_ID_1 = 1 << 8;
 
-    private MvTableStorage tableStorage;
+    protected MvTableStorage tableStorage;
 
     private TableIndexView sortedIdx;
 
     private TableIndexView hashIdx;
 
+    private StorageEngine storageEngine;
+
     /**
      * Initializes the internal structures needed for tests.
      *
      * <p>This method *MUST* always be called in either subclass' constructor or setUp method.
      */
-    protected final void initialize(MvTableStorage tableStorage, TablesConfiguration tablesCfg) {
-        createTestTable(tableStorage.configuration());
-        createTestIndexes(tablesCfg);
+    protected final void initialize(StorageEngine storageEngine, TablesConfiguration tablesConfig) {
+        createTestTable(getTableConfig(tablesConfig));
+        createTestIndexes(tablesConfig);
+
+        this.storageEngine = storageEngine;

Review Comment:
   Do we have a start/stop for storage engines? I think we do, you should call them then.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -247,28 +233,20 @@ public void close() {
 
             @Override
             public boolean hasNext() {
-                if (!closeBusyLock.enterBusy()) {
-                    throwStorageClosedException();
-                }
+                return busy(() -> {
+                    throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemorySortedIndexStorage.this::createStorageInfo);
 
-                try {
                     return cursor.hasNext();
-                } finally {
-                    closeBusyLock.leaveBusy();
-                }
+                });

Review Comment:
   Is this code really easier to read then the previous one?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -298,76 +276,74 @@ public void close() {
 
         @Override
         public boolean hasNext() {
-            if (!closeBusyLock.enterBusy()) {
-                throwStorageClosedException();
-            }
-
-            try {
-                advanceIfNeeded();
+            return busy(() -> {

Review Comment:
   What's the point of all these changes?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -105,71 +102,65 @@ public SortedIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {

Review Comment:
   Same here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {

Review Comment:
   Maybe you should make that method protected it it's expected to be overriden?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -298,76 +276,74 @@ public void close() {
 
         @Override
         public boolean hasNext() {
-            if (!closeBusyLock.enterBusy()) {
-                throwStorageClosedException();
-            }
-
-            try {
-                advanceIfNeeded();
+            return busy(() -> {
+                try {
+                    advanceIfNeeded();
 
-                return hasNext;
-            } catch (IgniteInternalCheckedException e) {
-                throw new StorageException("Error while advancing the cursor", e);
-            } finally {
-                closeBusyLock.leaveBusy();
-            }
+                    return hasNext;
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error while advancing the cursor", e);
+                }
+            });
         }
 
         @Override
         public IndexRow next() {
-            if (!closeBusyLock.enterBusy()) {
-                throwStorageClosedException();
-            }
-
-            try {
-                advanceIfNeeded();
+            return busy(() -> {
+                try {
+                    advanceIfNeeded();
 
-                boolean hasNext = this.hasNext;
+                    boolean hasNext = this.hasNext;
 
-                if (!hasNext) {
-                    throw new NoSuchElementException();
-                }
+                    if (!hasNext) {
+                        throw new NoSuchElementException();
+                    }
 
-                this.hasNext = null;
+                    this.hasNext = null;
 
-                return toIndexRowImpl(treeRow);
-            } catch (IgniteInternalCheckedException e) {
-                throw new StorageException("Error while advancing the cursor", e);
-            } finally {
-                closeBusyLock.leaveBusy();
-            }
+                    return toIndexRowImpl(treeRow);
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error while advancing the cursor", e);
+                }
+            });
         }
 
         @Override
         public @Nullable IndexRow peek() {
-            if (hasNext != null) {
-                if (hasNext) {
-                    return toIndexRowImpl(treeRow);
+            return busy(() -> {

Review Comment:
   I can repeat it once again if you wish



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -247,28 +233,20 @@ public void close() {
 
             @Override
             public boolean hasNext() {
-                if (!closeBusyLock.enterBusy()) {
-                    throwStorageClosedException();
-                }
+                return busy(() -> {
+                    throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemorySortedIndexStorage.this::createStorageInfo);
 
-                try {
                     return cursor.hasNext();
-                } finally {
-                    closeBusyLock.leaveBusy();
-                }
+                });
             }
 
             @Override
             public R next() {
-                if (!closeBusyLock.enterBusy()) {
-                    throwStorageClosedException();
-                }
+                return busy(() -> {
+                    throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemorySortedIndexStorage.this::createStorageInfo);

Review Comment:
   Same question about this code



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -157,35 +141,31 @@ protected AbstractPageMemoryMvPartitionStorage(
      * Starts a partition by initializing its internal structures.
      */
     public void start() {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {

Review Comment:
   Same here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -298,76 +276,74 @@ public void close() {
 
         @Override
         public boolean hasNext() {
-            if (!closeBusyLock.enterBusy()) {
-                throwStorageClosedException();
-            }
-
-            try {
-                advanceIfNeeded();
+            return busy(() -> {
+                try {
+                    advanceIfNeeded();
 
-                return hasNext;
-            } catch (IgniteInternalCheckedException e) {
-                throw new StorageException("Error while advancing the cursor", e);
-            } finally {
-                closeBusyLock.leaveBusy();
-            }
+                    return hasNext;
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error while advancing the cursor", e);
+                }
+            });
         }
 
         @Override
         public IndexRow next() {
-            if (!closeBusyLock.enterBusy()) {
-                throwStorageClosedException();
-            }
-
-            try {
-                advanceIfNeeded();
+            return busy(() -> {

Review Comment:
   Let me repeat my question, what's the point?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, partitionId);
+                VersionChainTree versionChainTree = createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId());

Review Comment:
   So, if we create a partition, and the previous one is still not destroyed, we would have to wait for the checkpoint, without triggering it. Maybe we should fix it in the future. I remember that we had an idea to return Future from create/getOrCreatePartition



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -396,4 +372,56 @@ private int compareRows(SortedIndexRowKey key1, SortedIndexRowKey key2) {
             );
         }
     }
+
+    /**
+     * Prepares storage for rebalancing.
+     *
+     * <p>Stops ongoing index operations.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    public void startRebalance() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Stops ongoing operations on the storage.

Review Comment:
   I don't think that it properly reflects what you're doing. Basically, you propagate that state that you just set to all further storage operations, while also waiting that all operations that saw the previous state are already completed.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -157,35 +141,31 @@ protected AbstractPageMemoryMvPartitionStorage(
      * Starts a partition by initializing its internal structures.
      */
     public void start() {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {
+            try (Cursor<IndexMeta> cursor = indexMetaTree.find(null, null)) {
+                NamedListView<TableIndexView> indexesCfgView = tableStorage.tablesConfiguration().indexes().value();
 
-        try (Cursor<IndexMeta> cursor = indexMetaTree.find(null, null)) {
-            NamedListView<TableIndexView> indexesCfgView = tablesConfiguration.indexes().value();
+                while (cursor.hasNext()) {
+                    IndexMeta indexMeta = cursor.next();
 
-            while (cursor.hasNext()) {
-                IndexMeta indexMeta = cursor.next();
+                    TableIndexView indexCfgView = getByInternalId(indexesCfgView, indexMeta.id());
 
-                TableIndexView indexCfgView = getByInternalId(indexesCfgView, indexMeta.id());
+                    if (indexCfgView instanceof HashIndexView) {
+                        hashIndexes.put(indexCfgView.id(), createOrRestoreHashIndex(indexMeta));

Review Comment:
   Thank you for the fix!



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -396,4 +372,56 @@ private int compareRows(SortedIndexRowKey key1, SortedIndexRowKey key2) {
             );
         }
     }
+
+    /**
+     * Prepares storage for rebalancing.
+     *
+     * <p>Stops ongoing index operations.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    public void startRebalance() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Stops ongoing operations on the storage.
+        busyLock.block();
+        busyLock.unblock();
+    }
+
+    /**
+     * Completes the rebalancing of the storage.
+     *
+     * @throws StorageRebalanceException If there is an error while completing the storage rebalance.
+     */
+    public void completeRebalance() {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+    }
+
+    /**
+     * Updates the internal data structures of the storage on rebalance.
+     *
+     * @param freeList Free list to store index columns.
+     * @param sortedIndexTree Sorted index tree instance.
+     * @throws StorageRebalanceException If the storage is not in the process of rebalancing.
+     */
+    public void updateDataStructuresOnRebalance(IndexColumnsFreeList freeList, SortedIndexTree sortedIndexTree) {
+        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+
+        this.freeList = freeList;
+
+        this.sortedIndexTree.close();

Review Comment:
   Same question here. Looks like a huge flaw in the design.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, partitionId);
+                VersionChainTree versionChainTree = createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId());
 
         return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
                 .thenAccept(unused -> dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
                 .thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
     }
+
+    private GroupPartitionId createGroupPartitionId(int partitionId) {
+        return new GroupPartitionId(tableConfig.tableId().value(), partitionId);
+    }
+
+    private <V> V inCheckpointLock(Supplier<V> supplier) {
+        CheckpointTimeoutLock checkpointTimeoutLock = dataRegion.checkpointManager().checkpointTimeoutLock();
+
+        checkpointTimeoutLock.checkpointReadLock();
+
+        try {
+            return supplier.get();
+        } finally {
+            checkpointTimeoutLock.checkpointReadUnlock();
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMeta(GroupPartitionId groupPartitionId, FilePageStore filePageStore) {
+        try {
+            PartitionMeta meta = dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), groupPartitionId, filePageStore);
+
+            dataRegion.partitionMetaManager().addMeta(groupPartitionId, meta);
+
+            filePageStore.pages(meta.pageCount());
+
+            filePageStore.setPageAllocationListener(pageIdx -> {
+                assert dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread();
+
+                meta.incrementPageCount(lastCheckpointId());
+            });
+
+            return meta;
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException(
+                    IgniteStringFormatter.format(
+                            "Error reading or creating partition meta information: [table={}, partitionId={}]",
+                            getTableName(),
+                            groupPartitionId.getPartitionId()
+                    ),
+                    e
+            );
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(GroupPartitionId groupPartitionId) {
+        TableView tableView = tableConfig.value();
+
+        FilePageStore filePageStore = ensurePartitionFilePageStore(tableView, groupPartitionId);

Review Comment:
   Just a nitpick, but what exactly do you ensure? You ensure that _partition file page store ..._ what?... Exists? Initialized? Started? Last word is missing



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -800,67 +732,65 @@ public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageEx
             } else {
                 return new TimestampCursor(treeCursor, timestamp);
             }
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+        });
     }
 
     @Override
     public @Nullable RowId closestRowId(RowId lowerBound) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
 
-        try (Cursor<VersionChain> cursor = versionChainTree.find(new VersionChainKey(lowerBound), null)) {
-            return cursor.hasNext() ? cursor.next().rowId() : null;
-        } catch (Exception e) {
-            throw new StorageException("Error occurred while trying to read a row id", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+            try (Cursor<VersionChain> cursor = versionChainTree.find(new VersionChainKey(lowerBound), null)) {
+                return cursor.hasNext() ? cursor.next().rowId() : null;
+            } catch (Exception e) {
+                throw new StorageException("Error occurred while trying to read a row id", e);
+            }
+        });
     }
 
     @Override
     public long rowsCount() {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
 
-        try {
-            return versionChainTree.size();
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Error occurred while fetching the size.", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+            try {
+                return versionChainTree.size();
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Error occurred while fetching the size", e);
+            }
+        });
     }
 
     private abstract class BasePartitionTimestampCursor implements PartitionTimestampCursor {
-        protected final Cursor<VersionChain> treeCursor;
+        final Cursor<VersionChain> treeCursor;

Review Comment:
   Ok, why changing visibility here?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, partitionId);
+                VersionChainTree versionChainTree = createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId());
 
         return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
                 .thenAccept(unused -> dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
                 .thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
     }
+
+    private GroupPartitionId createGroupPartitionId(int partitionId) {
+        return new GroupPartitionId(tableConfig.tableId().value(), partitionId);
+    }
+
+    private <V> V inCheckpointLock(Supplier<V> supplier) {
+        CheckpointTimeoutLock checkpointTimeoutLock = dataRegion.checkpointManager().checkpointTimeoutLock();
+
+        checkpointTimeoutLock.checkpointReadLock();
+
+        try {
+            return supplier.get();
+        } finally {
+            checkpointTimeoutLock.checkpointReadUnlock();
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMeta(GroupPartitionId groupPartitionId, FilePageStore filePageStore) {
+        try {
+            PartitionMeta meta = dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), groupPartitionId, filePageStore);
+
+            dataRegion.partitionMetaManager().addMeta(groupPartitionId, meta);
+
+            filePageStore.pages(meta.pageCount());
+
+            filePageStore.setPageAllocationListener(pageIdx -> {
+                assert dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread();
+
+                meta.incrementPageCount(lastCheckpointId());
+            });
+
+            return meta;
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException(
+                    IgniteStringFormatter.format(
+                            "Error reading or creating partition meta information: [table={}, partitionId={}]",
+                            getTableName(),
+                            groupPartitionId.getPartitionId()
+                    ),
+                    e
+            );
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(GroupPartitionId groupPartitionId) {

Review Comment:
   Dude, can you please come up with shorter name? Sometimes you should write documentation instead of creating super-long method names. By the way, I see no comments in your code. Is it so simple that it doesn't require any?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -987,24 +909,174 @@ public boolean hasNext() {
                     ReadResult result = findLatestRowVersion(chain);
 
                     if (result.isEmpty() && !result.isWriteIntent()) {
-                        continue;
+                        return null;
                     }
 
                     nextRead = result;
                     currentChain = chain;
 
                     return true;
-                } finally {
-                    closeBusyLock.leaveBusy();
+                });
+
+                if (hasNext != null) {
+                    return hasNext;
                 }
             }
         }
     }
 
+    private class ScanVersionsCursor implements Cursor<ReadResult> {
+        final RowId rowId;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private VersionChain versionChain;
+
+        @Nullable
+        private RowVersion rowVersion;
+
+        private ScanVersionsCursor(RowId rowId) {
+            this.rowId = rowId;
+        }
+
+        @Override
+        public void close() {
+            // No-op.
+        }
+
+        @Override
+        public boolean hasNext() {
+            return busy(() -> {
+                advanceIfNeeded();
+
+                return hasNext;
+            });
+        }
+
+        @Override
+        public ReadResult next() {
+            return busy(() -> {
+                advanceIfNeeded();
+
+                if (!hasNext) {
+                    throw new NoSuchElementException();
+                }
+
+                hasNext = null;
+
+                return rowVersionToResultNotFillingLastCommittedTs(versionChain, rowVersion);
+            });
+        }
+
+        private void advanceIfNeeded() {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
+
+            if (hasNext != null) {
+                return;
+            }
+
+            if (versionChain == null) {
+                try {
+                    versionChain = versionChainTree.findOne(new VersionChainKey(rowId));
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException(e);
+                }
+
+                rowVersion = versionChain == null ? null : readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
+            } else {
+                rowVersion = !rowVersion.hasNextLink() ? null : readRowVersion(rowVersion.nextLink(), ALWAYS_LOAD_VALUE);
+            }
+
+            hasNext = rowVersion != null;
+        }
+    }
+
+    @Override
+    public void close() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+            StorageState state = this.state.get();
+
+            assert state == StorageState.CLOSED : state;
+
+            return;
+        }
+
+        busyLock.block();
+
+        versionChainTree.close();

Review Comment:
   Can we close all resources in big "closeAll" call like we do in every other place?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -216,51 +197,41 @@ public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws Stor
 
     @Override
     public long persistedIndex() {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
-
-        try {
-            return persistedIndex;
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+        return busy(() -> persistedIndex);
     }
 
     @Override
     @Nullable
     public RaftGroupConfiguration committedGroupConfiguration() {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
-
-        try {
-            replicationProtocolGroupConfigReadWriteLock.readLock().lock();
-
+        return busy(() -> {

Review Comment:
   What is changed in this method? Why did you have to refactor it?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -96,49 +98,22 @@ public boolean isVolatile() {
 
     @Override
     protected void finishDestruction() {
-        dataRegion.pageMemory().onGroupDestroyed(tableCfg.tableId().value());
+        dataRegion.pageMemory().onGroupDestroyed(tableConfig.tableId().value());
     }
 
     @Override
     public PersistentPageMemoryMvPartitionStorage createMvPartitionStorage(int partitionId) {
-        CompletableFuture<Void> partitionDestroyFuture = partitionIdDestroyFutureMap.get(partitionId);
+        waitPartitionToBeDestroyed(partitionId);
 
-        if (partitionDestroyFuture != null) {
-            try {
-                // Time is chosen randomly (long enough) so as not to call #join().
-                partitionDestroyFuture.get(10, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                throw new StorageException("Error waiting for the destruction of the previous version of the partition: " + partitionId, e);
-            }
-        }
+        TableView tableView = tableConfig.value();
 
-        TableView tableView = tableCfg.value();
+        GroupPartitionId groupPartitionId = createGroupPartitionId(partitionId);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableView.tableId(), partitionId);
+        PartitionMeta meta = getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(groupPartitionId);

Review Comment:
   What's the point of refactoring this code anyway?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -365,4 +315,70 @@ private void syncMetadataOnCheckpoint(@Nullable Executor executor) throws Ignite
             });
         }
     }
+
+    @Override
+    public void lastAppliedOnRebalance(long lastAppliedIndex, long lastAppliedTerm) throws StorageException {
+        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+
+        lastApplied0(lastAppliedIndex, lastAppliedTerm);
+
+        persistedIndex = lastAppliedIndex;
+    }
+
+    /**
+     * Updates the internal data structures of the storage and its indexes on rebalance.
+     *
+     * @param meta Partition meta.
+     * @param rowVersionFreeList Free list for {@link RowVersion}.
+     * @param indexFreeList Free list fot {@link IndexColumns}.
+     * @param versionChainTree Table tree for {@link VersionChain}.
+     * @param indexMetaTree Tree that contains SQL indexes' metadata.
+     * @throws StorageRebalanceException If the storage is not in the process of rebalancing.
+     */
+    public void updateDataStructuresOnRebalance(
+            PartitionMeta meta,
+            RowVersionFreeList rowVersionFreeList,
+            IndexColumnsFreeList indexFreeList,
+            VersionChainTree versionChainTree,
+            IndexMetaTree indexMetaTree
+    ) {
+        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+
+        this.meta = meta;
+
+        this.rowVersionFreeList.close();

Review Comment:
   Ok, same question here. Why are these structures not already closed?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -528,20 +516,16 @@ private static byte[] rowBytes(@Nullable BinaryRow row) {
     @Override
     public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
             throws TxIdMismatchException, StorageException {
-        assert rowId.partitionId() == partitionId : rowId;
-
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            assert rowId.partitionId() == partitionId : rowId;
 
-        try {
             VersionChain currentChain = findVersionChain(rowId);
 
             if (currentChain == null) {
                 RowVersion newVersion = insertRowVersion(row, NULL_LINK);
 
-                VersionChain versionChain = VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId, newVersion.link(),
-                        NULL_LINK);
+                VersionChain versionChain = VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId,
+                        newVersion.link(), NULL_LINK);

Review Comment:
   What's the idea behind this change? Old formatting was bad, new one is equally bad. Why bother?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -203,19 +187,68 @@ public void destroy() throws StorageException {
      * Closes the hash index storage.
      */
     public void close() {
-        if (!CLOSED.compareAndSet(this, false, true)) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+            StorageState state = this.state.get();
+
+            assert state == StorageState.CLOSED : state;
+
             return;
         }
 
-        closeBusyLock.block();
+        busyLock.block();
 
         hashIndexTree.close();
     }
 
     /**
-     * Throws an exception that the storage is already closed.
+     * Prepares storage for rebalancing.
+     *
+     * <p>Stops ongoing index operations.
+     *
+     * @throws StorageRebalanceException If there was an error when starting the rebalance.
+     */
+    public void startRebalance() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+
+        // Stops ongoing operations on the storage.
+        busyLock.block();
+        busyLock.unblock();
+    }
+
+    /**
+     * Completes the rebalancing of the storage.
+     *
+     * @throws StorageRebalanceException If there is an error while completing the storage rebalance.
      */
-    private void throwStorageClosedException() {
-        throw new StorageClosedException();
+    public void completeRebalance() {
+        if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+        }
+    }
+
+    /**
+     * Updates the internal data structures of the storage on rebalance.
+     *
+     * @param freeList Free list to store index columns.
+     * @param hashIndexTree Hash index tree instance.
+     * @throws StorageRebalanceException If the storage is not in the process of rebalancing.
+     */
+    public void updateDataStructuresOnRebalance(IndexColumnsFreeList freeList, HashIndexTree hashIndexTree) {
+        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+
+        this.freeList = freeList;
+
+        this.hashIndexTree.close();

Review Comment:
   This is interesting. Why wasn't this object closed before? Something's wrong here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -105,71 +102,65 @@ public SortedIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
 
-        try {
-            SortedIndexRowKey lowerBound = toSortedIndexRow(key, lowestRowId);
+            try {
+                SortedIndexRowKey lowerBound = toSortedIndexRow(key, lowestRowId);
 
-            SortedIndexRowKey upperBound = toSortedIndexRow(key, highestRowId);
+                SortedIndexRowKey upperBound = toSortedIndexRow(key, highestRowId);
 
-            return convertCursor(sortedIndexTree.find(lowerBound, upperBound), SortedIndexRow::rowId);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to create scan cursor", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                return convertCursor(sortedIndexTree.find(lowerBound, upperBound), SortedIndexRow::rowId);
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to create scan cursor", e);
+            }
+        });
     }
 
     @Override
     public void put(IndexRow row) {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {
+            try {
+                SortedIndexRow sortedIndexRow = toSortedIndexRow(row.indexColumns(), row.rowId());
 
-        try {
-            SortedIndexRow sortedIndexRow = toSortedIndexRow(row.indexColumns(), row.rowId());
+                var insert = new InsertSortedIndexRowInvokeClosure(sortedIndexRow, freeList, sortedIndexTree.inlineSize());
 
-            var insert = new InsertSortedIndexRowInvokeClosure(sortedIndexRow, freeList, sortedIndexTree.inlineSize());
+                sortedIndexTree.invoke(sortedIndexRow, null, insert);
 
-            sortedIndexTree.invoke(sortedIndexRow, null, insert);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to put value into index", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                return null;
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to put value into index", e);
+            }
+        });
     }
 
     @Override
     public void remove(IndexRow row) {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {

Review Comment:
   Same here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -97,100 +93,88 @@ public HashIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {

Review Comment:
   Why did you have to do this? These are changes for the sake of changes. Did you want to artificially inflate you PR? Well, you succeeded in that case. Please roll it back or at least create a new "get*" method that would reduce the amount of unnecessary changes.
   I feel like half of all changes in this PR are unjustified. I don't like it. Reviewing PRs like this one is a nightmare



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, partitionId);
+                VersionChainTree versionChainTree = createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId());
 
         return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
                 .thenAccept(unused -> dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
                 .thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
     }
+
+    private GroupPartitionId createGroupPartitionId(int partitionId) {
+        return new GroupPartitionId(tableConfig.tableId().value(), partitionId);
+    }
+
+    private <V> V inCheckpointLock(Supplier<V> supplier) {
+        CheckpointTimeoutLock checkpointTimeoutLock = dataRegion.checkpointManager().checkpointTimeoutLock();
+
+        checkpointTimeoutLock.checkpointReadLock();
+
+        try {
+            return supplier.get();
+        } finally {
+            checkpointTimeoutLock.checkpointReadUnlock();
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMeta(GroupPartitionId groupPartitionId, FilePageStore filePageStore) {
+        try {
+            PartitionMeta meta = dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), groupPartitionId, filePageStore);
+
+            dataRegion.partitionMetaManager().addMeta(groupPartitionId, meta);
+
+            filePageStore.pages(meta.pageCount());
+
+            filePageStore.setPageAllocationListener(pageIdx -> {
+                assert dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread();
+
+                meta.incrementPageCount(lastCheckpointId());
+            });
+
+            return meta;
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException(
+                    IgniteStringFormatter.format(
+                            "Error reading or creating partition meta information: [table={}, partitionId={}]",
+                            getTableName(),
+                            groupPartitionId.getPartitionId()
+                    ),
+                    e
+            );
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(GroupPartitionId groupPartitionId) {
+        TableView tableView = tableConfig.value();
+
+        FilePageStore filePageStore = ensurePartitionFilePageStore(tableView, groupPartitionId);
+
+        PartitionMeta partitionMeta = getOrCreatePartitionMeta(groupPartitionId, filePageStore);
+
+        if (partitionMeta.lastAppliedIndex() == REBALANCE_IN_PROGRESS) {
+            try {
+                // Time is chosen randomly (long enough) so as not to call #join().
+                destroyPartitionPhysically(groupPartitionId).get(10, TimeUnit.SECONDS);

Review Comment:
   I don't see a TODO, I asked about it last time. This code stinks. `Time is chosen randomly (long enough)` - if it's long enough, it's not random. If it's random, it can't be long enough. I don't like this solution, please create a JIRA to fix all such places



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1073652228


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -247,28 +233,20 @@ public void close() {
 
             @Override
             public boolean hasNext() {
-                if (!closeBusyLock.enterBusy()) {
-                    throwStorageClosedException();
-                }
+                return busy(() -> {
+                    throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemorySortedIndexStorage.this::createStorageInfo);
 
-                try {
                     return cursor.hasNext();
-                } finally {
-                    closeBusyLock.leaveBusy();
-                }
+                });

Review Comment:
   Why force it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072249213


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -97,100 +93,88 @@ public HashIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {

Review Comment:
   Tried to fix it.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -97,100 +93,88 @@ public HashIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
 
-        try {
-            IndexColumns indexColumns = new IndexColumns(partitionId, key.byteBuffer());
+            try {
+                IndexColumns indexColumns = new IndexColumns(partitionId, key.byteBuffer());
 
-            HashIndexRow lowerBound = new HashIndexRow(indexColumns, lowestRowId);
-            HashIndexRow upperBound = new HashIndexRow(indexColumns, highestRowId);
+                HashIndexRow lowerBound = new HashIndexRow(indexColumns, lowestRowId);
+                HashIndexRow upperBound = new HashIndexRow(indexColumns, highestRowId);
 
-            Cursor<HashIndexRow> cursor = hashIndexTree.find(lowerBound, upperBound);
+                Cursor<HashIndexRow> cursor = hashIndexTree.find(lowerBound, upperBound);
 
-            return new Cursor<>() {
-                @Override
-                public void close() {
-                    cursor.close();
-                }
-
-                @Override
-                public boolean hasNext() {
-                    if (!closeBusyLock.enterBusy()) {
-                        throwStorageClosedException();
+                return new Cursor<RowId>() {
+                    @Override
+                    public void close() {
+                        cursor.close();
                     }
 
-                    try {
-                        return cursor.hasNext();
-                    } finally {
-                        closeBusyLock.leaveBusy();
-                    }
-                }
+                    @Override
+                    public boolean hasNext() {
+                        return busy(() -> {
+                            throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemoryHashIndexStorage.this::createStorageInfo);
 
-                @Override
-                public RowId next() {
-                    if (!closeBusyLock.enterBusy()) {
-                        throwStorageClosedException();
+                            return cursor.hasNext();
+                        });
                     }
 
-                    try {
-                        return cursor.next().rowId();
-                    } finally {
-                        closeBusyLock.leaveBusy();
+                    @Override
+                    public RowId next() {
+                        return busy(() -> {
+                            throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemoryHashIndexStorage.this::createStorageInfo);
+
+                            return cursor.next().rowId();
+                        });
                     }
-                }
-            };
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to create scan cursor", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                };
+            } catch (Throwable e) {
+                throw new StorageException("Failed to create scan cursor", e);
+            }
+        });
     }
 
     @Override
     public void put(IndexRow row) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {

Review Comment:
   Tried to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072185181


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java:
##########
@@ -18,61 +18,89 @@
 package org.apache.ignite.internal.storage.pagememory;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.Function;
 import org.apache.ignite.internal.pagememory.DataRegion;
 import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.freelist.FreeList;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage;
-import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteStringFormatter;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Abstract table storage implementation based on {@link PageMemory}.
  */
 public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
-    protected final TableConfiguration tableCfg;
+    protected static final VarHandle CLOSED;

Review Comment:
   Returned it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072201651


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java:
##########
@@ -167,86 +203,79 @@ public CompletableFuture<Void> destroy() {
 
     @Override
     public AbstractPageMemoryMvPartitionStorage getOrCreateMvPartition(int partitionId) throws StorageException {
-        AbstractPageMemoryMvPartitionStorage partition = getMvPartition(partitionId);
+        return inBusyLock(busyLock, () -> {
+            AbstractPageMemoryMvPartitionStorage partition = getMvPartitionStorageWithoutBusyLock(partitionId);
 
-        if (partition != null) {
-            return partition;
-        }
+            if (partition != null) {
+                return partition;
+            }
 
-        partition = createMvPartitionStorage(partitionId);
+            partition = createMvPartitionStorage(partitionId);
 
-        partition.start();
+            partition.start();
 
-        mvPartitions.set(partitionId, partition);
+            mvPartitions.set(partitionId, partition);
 
-        return partition;
+            return partition;
+        });
     }
 
     @Override
     public @Nullable AbstractPageMemoryMvPartitionStorage getMvPartition(int partitionId) {
-        assert started : "Storage has not started yet";
-
-        checkPartitionId(partitionId);
-
-        return mvPartitions.get(partitionId);
+        return inBusyLock(busyLock, () -> getMvPartitionStorageWithoutBusyLock(partitionId));
     }
 
     @Override
     public CompletableFuture<Void> destroyPartition(int partitionId) {
-        assert started : "Storage has not started yet";
+        return inBusyLock(busyLock, () -> {

Review Comment:
   Tried to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072256909


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -247,28 +233,20 @@ public void close() {
 
             @Override
             public boolean hasNext() {
-                if (!closeBusyLock.enterBusy()) {
-                    throwStorageClosedException();
-                }
+                return busy(() -> {
+                    throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemorySortedIndexStorage.this::createStorageInfo);
 
-                try {
                     return cursor.hasNext();
-                } finally {
-                    closeBusyLock.leaveBusy();
-                }
+                });
             }
 
             @Override
             public R next() {
-                if (!closeBusyLock.enterBusy()) {
-                    throwStorageClosedException();
-                }
+                return busy(() -> {
+                    throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemorySortedIndexStorage.this::createStorageInfo);

Review Comment:
   Same, but why not use a generic template for this class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072275714


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -157,35 +141,31 @@ protected AbstractPageMemoryMvPartitionStorage(
      * Starts a partition by initializing its internal structures.
      */
     public void start() {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {

Review Comment:
   Tried to fix it.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -528,20 +516,16 @@ private static byte[] rowBytes(@Nullable BinaryRow row) {
     @Override
     public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
             throws TxIdMismatchException, StorageException {
-        assert rowId.partitionId() == partitionId : rowId;
-
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            assert rowId.partitionId() == partitionId : rowId;
 
-        try {
             VersionChain currentChain = findVersionChain(rowId);
 
             if (currentChain == null) {
                 RowVersion newVersion = insertRowVersion(row, NULL_LINK);
 
-                VersionChain versionChain = VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId, newVersion.link(),
-                        NULL_LINK);
+                VersionChain versionChain = VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId,
+                        newVersion.link(), NULL_LINK);

Review Comment:
   Fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072279893


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -216,51 +197,41 @@ public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws Stor
 
     @Override
     public long persistedIndex() {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
-
-        try {
-            return persistedIndex;
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+        return busy(() -> persistedIndex);
     }
 
     @Override
     @Nullable
     public RaftGroupConfiguration committedGroupConfiguration() {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
-
-        try {
-            replicationProtocolGroupConfigReadWriteLock.readLock().lock();
-
+        return busy(() -> {

Review Comment:
   Tried to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072249467


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -97,100 +93,88 @@ public HashIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
 
-        try {
-            IndexColumns indexColumns = new IndexColumns(partitionId, key.byteBuffer());
+            try {
+                IndexColumns indexColumns = new IndexColumns(partitionId, key.byteBuffer());
 
-            HashIndexRow lowerBound = new HashIndexRow(indexColumns, lowestRowId);
-            HashIndexRow upperBound = new HashIndexRow(indexColumns, highestRowId);
+                HashIndexRow lowerBound = new HashIndexRow(indexColumns, lowestRowId);
+                HashIndexRow upperBound = new HashIndexRow(indexColumns, highestRowId);
 
-            Cursor<HashIndexRow> cursor = hashIndexTree.find(lowerBound, upperBound);
+                Cursor<HashIndexRow> cursor = hashIndexTree.find(lowerBound, upperBound);
 
-            return new Cursor<>() {
-                @Override
-                public void close() {
-                    cursor.close();
-                }
-
-                @Override
-                public boolean hasNext() {
-                    if (!closeBusyLock.enterBusy()) {
-                        throwStorageClosedException();
+                return new Cursor<RowId>() {
+                    @Override
+                    public void close() {
+                        cursor.close();
                     }
 
-                    try {
-                        return cursor.hasNext();
-                    } finally {
-                        closeBusyLock.leaveBusy();
-                    }
-                }
+                    @Override
+                    public boolean hasNext() {
+                        return busy(() -> {
+                            throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemoryHashIndexStorage.this::createStorageInfo);
 
-                @Override
-                public RowId next() {
-                    if (!closeBusyLock.enterBusy()) {
-                        throwStorageClosedException();
+                            return cursor.hasNext();
+                        });
                     }
 
-                    try {
-                        return cursor.next().rowId();
-                    } finally {
-                        closeBusyLock.leaveBusy();
+                    @Override
+                    public RowId next() {
+                        return busy(() -> {
+                            throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemoryHashIndexStorage.this::createStorageInfo);
+
+                            return cursor.next().rowId();
+                        });
                     }
-                }
-            };
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to create scan cursor", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                };
+            } catch (Throwable e) {
+                throw new StorageException("Failed to create scan cursor", e);
+            }
+        });
     }
 
     @Override
     public void put(IndexRow row) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {
+            try {
+                IndexColumns indexColumns = new IndexColumns(partitionId, row.indexColumns().byteBuffer());
 
-        try {
-            IndexColumns indexColumns = new IndexColumns(partitionId, row.indexColumns().byteBuffer());
+                HashIndexRow hashIndexRow = new HashIndexRow(indexColumns, row.rowId());
 
-            HashIndexRow hashIndexRow = new HashIndexRow(indexColumns, row.rowId());
+                var insert = new InsertHashIndexRowInvokeClosure(hashIndexRow, freeList, hashIndexTree.inlineSize());
 
-            var insert = new InsertHashIndexRowInvokeClosure(hashIndexRow, freeList, hashIndexTree.inlineSize());
+                hashIndexTree.invoke(hashIndexRow, null, insert);
 
-            hashIndexTree.invoke(hashIndexRow, null, insert);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to put value into index", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                return null;
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to put value into index", e);
+            }
+        });
     }
 
     @Override
     public void remove(IndexRow row) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {

Review Comment:
   Tried to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072250057


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -105,71 +102,65 @@ public SortedIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
 
-        try {
-            SortedIndexRowKey lowerBound = toSortedIndexRow(key, lowestRowId);
+            try {
+                SortedIndexRowKey lowerBound = toSortedIndexRow(key, lowestRowId);
 
-            SortedIndexRowKey upperBound = toSortedIndexRow(key, highestRowId);
+                SortedIndexRowKey upperBound = toSortedIndexRow(key, highestRowId);
 
-            return convertCursor(sortedIndexTree.find(lowerBound, upperBound), SortedIndexRow::rowId);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to create scan cursor", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                return convertCursor(sortedIndexTree.find(lowerBound, upperBound), SortedIndexRow::rowId);
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to create scan cursor", e);
+            }
+        });
     }
 
     @Override
     public void put(IndexRow row) {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {
+            try {
+                SortedIndexRow sortedIndexRow = toSortedIndexRow(row.indexColumns(), row.rowId());
 
-        try {
-            SortedIndexRow sortedIndexRow = toSortedIndexRow(row.indexColumns(), row.rowId());
+                var insert = new InsertSortedIndexRowInvokeClosure(sortedIndexRow, freeList, sortedIndexTree.inlineSize());
 
-            var insert = new InsertSortedIndexRowInvokeClosure(sortedIndexRow, freeList, sortedIndexTree.inlineSize());
+                sortedIndexTree.invoke(sortedIndexRow, null, insert);
 
-            sortedIndexTree.invoke(sortedIndexRow, null, insert);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to put value into index", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                return null;
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to put value into index", e);
+            }
+        });
     }
 
     @Override
     public void remove(IndexRow row) {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {

Review Comment:
   Tried to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072251321


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -247,28 +233,20 @@ public void close() {
 
             @Override
             public boolean hasNext() {
-                if (!closeBusyLock.enterBusy()) {
-                    throwStorageClosedException();
-                }
+                return busy(() -> {
+                    throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemorySortedIndexStorage.this::createStorageInfo);
 
-                try {
                     return cursor.hasNext();
-                } finally {
-                    closeBusyLock.leaveBusy();
-                }
+                });

Review Comment:
   Also, but why not use a generic template for this class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1506: IGNITE-18029 Implementation of a full rebalance for PersistentPageMemoryMvPartitionStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072415322


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, partitionId);
+                VersionChainTree versionChainTree = createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId());
 
         return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
                 .thenAccept(unused -> dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
                 .thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
     }
+
+    private GroupPartitionId createGroupPartitionId(int partitionId) {
+        return new GroupPartitionId(tableConfig.tableId().value(), partitionId);
+    }
+
+    private <V> V inCheckpointLock(Supplier<V> supplier) {
+        CheckpointTimeoutLock checkpointTimeoutLock = dataRegion.checkpointManager().checkpointTimeoutLock();
+
+        checkpointTimeoutLock.checkpointReadLock();
+
+        try {
+            return supplier.get();
+        } finally {
+            checkpointTimeoutLock.checkpointReadUnlock();
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMeta(GroupPartitionId groupPartitionId, FilePageStore filePageStore) {
+        try {
+            PartitionMeta meta = dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), groupPartitionId, filePageStore);
+
+            dataRegion.partitionMetaManager().addMeta(groupPartitionId, meta);
+
+            filePageStore.pages(meta.pageCount());
+
+            filePageStore.setPageAllocationListener(pageIdx -> {
+                assert dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread();
+
+                meta.incrementPageCount(lastCheckpointId());
+            });
+
+            return meta;
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException(
+                    IgniteStringFormatter.format(
+                            "Error reading or creating partition meta information: [table={}, partitionId={}]",
+                            getTableName(),
+                            groupPartitionId.getPartitionId()
+                    ),
+                    e
+            );
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(GroupPartitionId groupPartitionId) {
+        TableView tableView = tableConfig.value();
+
+        FilePageStore filePageStore = ensurePartitionFilePageStore(tableView, groupPartitionId);

Review Comment:
   Try to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org