You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/12/26 08:05:22 UTC

[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

ibessonov commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1056289362


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java:
##########
@@ -165,43 +167,65 @@ default IndexStorage getOrCreateIndex(int partitionId, UUID indexId) {
     CompletableFuture<Void> destroy();
 
     /**
-     * Prepares the partition storage for rebalancing: makes a backup of the current partition storage and creates a new storage.
-     *
-     * <p>This method must be called before every full rebalance of the partition storage, so that in case of errors or cancellation of the
-     * full rebalance, we can restore the partition storage from the backup.
-     *
-     * <p>Full rebalance will be completed when one of the methods is called:
-     * <ol>
-     *     <li>{@link #abortRebalanceMvPartition(int)} - in case of a full rebalance cancellation or failure, so that we can
-     *     restore the partition storage from a backup;</li>
-     *     <li>{@link #finishRebalanceMvPartition(int)} - in case of a successful full rebalance, to remove the backup of the
-     *     partition storage.</li>
-     * </ol>
+     * Prepares a partition for a full rebalance.
+     * <ul>
+     *     <li>Cleans up the {@link MvPartitionStorage multi-version partition storage} and its associated indexes ({@link HashIndexStorage}
+     *     and {@link SortedIndexStorage});</li>
+     *     <li>Sets {@link MvPartitionStorage#lastAppliedIndex()} and {@link MvPartitionStorage#lastAppliedTerm()} to
+     *     {@link MvPartitionStorage#FULL_REBALANCE_IN_PROGRESS};</li>
+     *     <li>Stops the cursors of a multi-version partition storage and its indexes, subsequent calls to {@link Cursor#hasNext()} and
+     *     {@link Cursor#next()} will throw {@link StorageFullRebalanceException};</li>
+     *     <li>For a multi-version partition storage and its indexes, only write methods will be available, while read and

Review Comment:
   Do all write methods need to be functional? I assume there are only two of them that are actually useful



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getOutConfigRangePartitionId()));

Review Comment:
   `getOutConfigRangePartitionId` is a very confusing name. Maybe you can rename it to something like`getPartitionIdOutOfRange`?
   My problem with it is that usually you view the name of the method as "verb + object + something at the end", and "get out" just feels like an inappropriately (and incorrectly) used phrasal verb. I blame that on words ordering.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -378,19 +381,59 @@ private static ReadResult walkVersionChain(VersionChain chainHead, HybridTimesta
 
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessFullRebalance();
+
+        return new Cursor<>() {
+            @Nullable
+            private Boolean hasNext;
+
+            @Nullable
+            private VersionChain versionChain;
+
+            @Override
+            public void close() {
+                // No-op.
+            }
+
+            @Override
+            public boolean hasNext() {
+                checkStorageClosedOrInProcessFullRebalance();
 
-        return Cursor.fromBareIterator(
-                Stream.iterate(map.get(rowId), Objects::nonNull, vc -> vc.next)
-                        .peek(versionChain -> checkClosed())
-                        .map((VersionChain versionChain) -> versionChainToReadResult(versionChain, false))
-                        .iterator()
-        );
+                advanceIfNeeded();
+
+                return hasNext;
+            }
+
+            @Override
+            public ReadResult next() {
+                checkStorageClosedOrInProcessFullRebalance();

Review Comment:
   Why don't you move this call inside of `advanceIfNeeded()`?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getOutConfigRangePartitionId()));
 
-        partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+        IndexRow indexRow = indexRow(binaryRow, rowId);
 
-            partitionStorage.lastApplied(100, 10);
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
+
+            hashIndexStorage.put(indexRow);
 
-            partitionStorage.committedGroupConfiguration(new RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
+            sortedIndexStorage.put(indexRow);
 
             return null;
         });
 
-        partitionStorage.flush().get(1, TimeUnit.SECONDS);
+        Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rowId);
+        PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
+
+        Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(indexRow.indexColumns());
+
+        Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        MvPartitionStorage newPartitionStorage0 = tableStorage.getMvPartition(PARTITION_ID);
+        // Let's check that we won't get destroyed storages.
+        assertNull(tableStorage.getMvPartition(PARTITION_ID));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
 
-        assertNotNull(newPartitionStorage0);
-        assertNotSame(partitionStorage, newPartitionStorage0);
+        checkStorageDestroyed(mvPartitionStorage);
+        checkStorageDestroyed(hashIndexStorage);
+        checkStorageDestroyed(sortedIndexStorage);
 
-        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
-        assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
-        assertNull(newPartitionStorage0.committedGroupConfiguration());
-        assertEquals(0L, newPartitionStorage0.persistedIndex());
-        assertEquals(0, newPartitionStorage0.rowsCount());
+        assertThrows(StorageClosedException.class, () -> getAll(scanVersionsCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanTimestampCursor));
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromHashIndexCursor));
 
-        MvPartitionStorage newPartitionStorage1 = tableStorage.getMvPartition(PARTITION_ID);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromSortedIndexCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanFromSortedIndexCursor));
 
-        assertSame(newPartitionStorage0, newPartitionStorage1);
+        // Let's check that nothing will happen if we try to destroy a non-existing partition.
+        assertDoesNotThrow(() -> tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS));
     }
 
     @Test
-    public void testAbortRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    public void testReCreatePartition() throws Exception {
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
 
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+            return null;
+        });
 
-        tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        assertSame(partitionStorage, tableStorage.getMvPartition(PARTITION_ID));
+        MvPartitionStorage newMvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
 
-        assertDoesNotThrow(() -> tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+        assertThat(getAll(newMvPartitionStorage.scanVersions(rowId)), empty());
     }
 
     @Test
-    public void testFinishRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    public void testSuccessFullRebalance() throws Exception {

Review Comment:
   Damn, this test is long. Is there a way to simplify it? It's too much, I won't even try to follow it



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -537,9 +584,50 @@ public synchronized void clear() {
         gcQueue.clear();
     }
 
-    private void checkClosed() {
+    private void checkStorageClosed() {
         if (closed) {
             throw new StorageClosedException("Storage is already closed");
         }
     }
+
+    private void checkStorageClosedOrInProcessFullRebalance() {
+        checkStorageClosed();
+
+        if (fullRebalance) {
+            throw new StorageFullRebalanceException("Storage in the process of a full rebalancing");
+        }
+    }
+
+    void startFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = true;
+
+        clear();
+
+        lastAppliedIndex = FULL_REBALANCE_IN_PROGRESS;
+        lastAppliedTerm = FULL_REBALANCE_IN_PROGRESS;
+    }
+
+    void abortFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = false;
+
+        clear();
+
+        lastAppliedIndex = 0;
+        lastAppliedTerm = 0;
+    }
+
+    void finishFullRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+        checkStorageClosed();
+
+        assert fullRebalance;

Review Comment:
   I see no such assertion in "abort" method, why?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getOutConfigRangePartitionId()));
 
-        partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+        IndexRow indexRow = indexRow(binaryRow, rowId);
 
-            partitionStorage.lastApplied(100, 10);
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
+
+            hashIndexStorage.put(indexRow);
 
-            partitionStorage.committedGroupConfiguration(new RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
+            sortedIndexStorage.put(indexRow);
 
             return null;
         });
 
-        partitionStorage.flush().get(1, TimeUnit.SECONDS);
+        Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rowId);
+        PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
+
+        Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(indexRow.indexColumns());
+
+        Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        MvPartitionStorage newPartitionStorage0 = tableStorage.getMvPartition(PARTITION_ID);
+        // Let's check that we won't get destroyed storages.
+        assertNull(tableStorage.getMvPartition(PARTITION_ID));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
 
-        assertNotNull(newPartitionStorage0);
-        assertNotSame(partitionStorage, newPartitionStorage0);
+        checkStorageDestroyed(mvPartitionStorage);
+        checkStorageDestroyed(hashIndexStorage);
+        checkStorageDestroyed(sortedIndexStorage);
 
-        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
-        assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
-        assertNull(newPartitionStorage0.committedGroupConfiguration());
-        assertEquals(0L, newPartitionStorage0.persistedIndex());
-        assertEquals(0, newPartitionStorage0.rowsCount());
+        assertThrows(StorageClosedException.class, () -> getAll(scanVersionsCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanTimestampCursor));
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromHashIndexCursor));
 
-        MvPartitionStorage newPartitionStorage1 = tableStorage.getMvPartition(PARTITION_ID);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromSortedIndexCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanFromSortedIndexCursor));
 
-        assertSame(newPartitionStorage0, newPartitionStorage1);
+        // Let's check that nothing will happen if we try to destroy a non-existing partition.
+        assertDoesNotThrow(() -> tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS));

Review Comment:
   Well, how about using `assertThat(tableStorage.destroyPartition(PARTITION_ID), willCompleteSuccessfully());`? We already have this thing in our test framework.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -561,4 +800,61 @@ private void checkStorageDestroyed(IndexStorage storage) {
 
         assertThrows(StorageClosedException.class, () -> storage.remove(indexRow));
     }
+
+    private int getOutConfigRangePartitionId() {

Review Comment:
   Same comment about the name



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -464,13 +507,15 @@ public ReadResult next() {
 
     @Override
     public @Nullable RowId closestRowId(RowId lowerBound) throws StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessFullRebalance();

Review Comment:
   Again, word usage raises some questions. Maybe "in process **of** full reblance"



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java:
##########
@@ -229,34 +236,90 @@ public CompletableFuture<Void> destroy() {
     }
 
     @Override
-    public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
-        MvPartitionStorage oldPartitionStorage = partitions.get(partitionId);
+    public CompletableFuture<Void> startFullRebalancePartition(int partitionId) {

Review Comment:
   Why do we need to add "full" everywhere? As if we have a dedicated process for non-full rebalance, that is known to the storage? No, we don't have it



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getOutConfigRangePartitionId()));
 
-        partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+        IndexRow indexRow = indexRow(binaryRow, rowId);
 
-            partitionStorage.lastApplied(100, 10);
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
+
+            hashIndexStorage.put(indexRow);
 
-            partitionStorage.committedGroupConfiguration(new RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
+            sortedIndexStorage.put(indexRow);
 
             return null;
         });
 
-        partitionStorage.flush().get(1, TimeUnit.SECONDS);
+        Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rowId);
+        PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
+
+        Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(indexRow.indexColumns());
+
+        Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        MvPartitionStorage newPartitionStorage0 = tableStorage.getMvPartition(PARTITION_ID);
+        // Let's check that we won't get destroyed storages.
+        assertNull(tableStorage.getMvPartition(PARTITION_ID));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
 
-        assertNotNull(newPartitionStorage0);
-        assertNotSame(partitionStorage, newPartitionStorage0);
+        checkStorageDestroyed(mvPartitionStorage);
+        checkStorageDestroyed(hashIndexStorage);
+        checkStorageDestroyed(sortedIndexStorage);
 
-        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
-        assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
-        assertNull(newPartitionStorage0.committedGroupConfiguration());
-        assertEquals(0L, newPartitionStorage0.persistedIndex());
-        assertEquals(0, newPartitionStorage0.rowsCount());
+        assertThrows(StorageClosedException.class, () -> getAll(scanVersionsCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanTimestampCursor));
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromHashIndexCursor));
 
-        MvPartitionStorage newPartitionStorage1 = tableStorage.getMvPartition(PARTITION_ID);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromSortedIndexCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanFromSortedIndexCursor));
 
-        assertSame(newPartitionStorage0, newPartitionStorage1);
+        // Let's check that nothing will happen if we try to destroy a non-existing partition.
+        assertDoesNotThrow(() -> tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS));
     }
 
     @Test
-    public void testAbortRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    public void testReCreatePartition() throws Exception {

Review Comment:
   I would add an explicit assumption here that the storage is not volatile, otherwise it makes no sense.
   (my that I mean `assumeTrue` or `assumeThat` methods)



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java:
##########
@@ -315,4 +314,51 @@ private RowId getRowId(@Nullable Entry<RowId, ?> rowIdEntry) {
             return rowIdEntry == null ? null : rowIdEntry.getKey();
         }
     }
+
+    private void checkStorageClosed() {
+        if (closed) {
+            throw new StorageClosedException("Storage is already closed");
+        }
+    }
+
+    private void checkStorageClosedOrInProcessOfRebalance() {
+        checkStorageClosed();
+
+        if (fullRebalance) {
+            throw new StorageFullRebalanceException("Storage in the process of a full rebalancing");
+        }
+    }
+
+    /**
+     * Starts a full rebalancing for the storage.
+     */
+    public void startFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = true;
+
+        clear();
+    }
+
+    /**
+     * Aborts a full rebalance of the storage.
+     */
+    public void abortFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = false;
+
+        clear();
+    }
+
+    /**
+     * Completes a full rebalance of the storage.
+     */
+    public void finishFullRebalance() {
+        checkStorageClosed();
+
+        assert fullRebalance;

Review Comment:
   Same question here



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java:
##########
@@ -110,6 +113,10 @@ public MvPartitionStorage getMvPartition(int partitionId) {
     public CompletableFuture<Void> destroyPartition(int partitionId) {
         checkPartitionId(partitionId);
 
+        if (fullRebalanceFutureByPartitionId.containsKey(partitionId)) {
+            throw new StorageException("Partition in the process of full rebalancing: " + partitionId);

Review Comment:
   Wait, I thought that we discussed it. How is it possible that there's a rebalance during destruction? This should not happen.
   Also, where's the documentation of this behavior?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -378,19 +381,59 @@ private static ReadResult walkVersionChain(VersionChain chainHead, HybridTimesta
 
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessFullRebalance();
+
+        return new Cursor<>() {

Review Comment:
   Can it be an inner class, not anonymous?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org