You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/12/22 17:55:27 UTC

[GitHub] [ignite-3] tkalkirill opened a new pull request, #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

tkalkirill opened a new pull request, #1471:
URL: https://github.com/apache/ignite-3/pull/1471

   https://issues.apache.org/jira/browse/IGNITE-18073


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057187847


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java:
##########
@@ -315,4 +314,51 @@ private RowId getRowId(@Nullable Entry<RowId, ?> rowIdEntry) {
             return rowIdEntry == null ? null : rowIdEntry.getKey();
         }
     }
+
+    private void checkStorageClosed() {
+        if (closed) {
+            throw new StorageClosedException("Storage is already closed");
+        }
+    }
+
+    private void checkStorageClosedOrInProcessOfRebalance() {
+        checkStorageClosed();
+
+        if (fullRebalance) {
+            throw new StorageFullRebalanceException("Storage in the process of a full rebalancing");
+        }
+    }
+
+    /**
+     * Starts a full rebalancing for the storage.
+     */
+    public void startFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = true;
+
+        clear();
+    }
+
+    /**
+     * Aborts a full rebalance of the storage.
+     */
+    public void abortFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = false;
+
+        clear();
+    }
+
+    /**
+     * Completes a full rebalance of the storage.
+     */
+    public void finishFullRebalance() {
+        checkStorageClosed();
+
+        assert fullRebalance;

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057186091


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -378,19 +381,59 @@ private static ReadResult walkVersionChain(VersionChain chainHead, HybridTimesta
 
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessFullRebalance();
+
+        return new Cursor<>() {

Review Comment:
   I don't think it would be any better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057185453


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -561,4 +800,61 @@ private void checkStorageDestroyed(IndexStorage storage) {
 
         assertThrows(StorageClosedException.class, () -> storage.remove(indexRow));
     }
+
+    private int getOutConfigRangePartitionId() {

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057315115


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -378,19 +381,59 @@ private static ReadResult walkVersionChain(VersionChain chainHead, HybridTimesta
 
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessFullRebalance();
+
+        return new Cursor<>() {

Review Comment:
   Fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057319327


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java:
##########
@@ -110,6 +113,10 @@ public MvPartitionStorage getMvPartition(int partitionId) {
     public CompletableFuture<Void> destroyPartition(int partitionId) {
         checkPartitionId(partitionId);
 
+        if (fullRebalanceFutureByPartitionId.containsKey(partitionId)) {
+            throw new StorageException("Partition in the process of full rebalancing: " + partitionId);

Review Comment:
   Wait a bit, here I just check that if at the time of calling `MvTableStorage#destroyPartition` we are still rebalancing, then the method cannot be executed, I added this to the documentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057242196


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getOutConfigRangePartitionId()));
 
-        partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+        IndexRow indexRow = indexRow(binaryRow, rowId);
 
-            partitionStorage.lastApplied(100, 10);
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
+
+            hashIndexStorage.put(indexRow);
 
-            partitionStorage.committedGroupConfiguration(new RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
+            sortedIndexStorage.put(indexRow);
 
             return null;
         });
 
-        partitionStorage.flush().get(1, TimeUnit.SECONDS);
+        Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rowId);
+        PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
+
+        Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(indexRow.indexColumns());
+
+        Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        MvPartitionStorage newPartitionStorage0 = tableStorage.getMvPartition(PARTITION_ID);
+        // Let's check that we won't get destroyed storages.
+        assertNull(tableStorage.getMvPartition(PARTITION_ID));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
 
-        assertNotNull(newPartitionStorage0);
-        assertNotSame(partitionStorage, newPartitionStorage0);
+        checkStorageDestroyed(mvPartitionStorage);
+        checkStorageDestroyed(hashIndexStorage);
+        checkStorageDestroyed(sortedIndexStorage);
 
-        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
-        assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
-        assertNull(newPartitionStorage0.committedGroupConfiguration());
-        assertEquals(0L, newPartitionStorage0.persistedIndex());
-        assertEquals(0, newPartitionStorage0.rowsCount());
+        assertThrows(StorageClosedException.class, () -> getAll(scanVersionsCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanTimestampCursor));
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromHashIndexCursor));
 
-        MvPartitionStorage newPartitionStorage1 = tableStorage.getMvPartition(PARTITION_ID);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromSortedIndexCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanFromSortedIndexCursor));
 
-        assertSame(newPartitionStorage0, newPartitionStorage1);
+        // Let's check that nothing will happen if we try to destroy a non-existing partition.
+        assertDoesNotThrow(() -> tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS));
     }
 
     @Test
-    public void testAbortRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    public void testReCreatePartition() throws Exception {

Review Comment:
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057519730


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -586,6 +586,10 @@ void startRebalance() {
     void abortRebalance() {
         checkStorageClosed();
 
+        if (!rebalanced) {

Review Comment:
   Name is confusing. Past simple implies that rebalance already happened, not "in progress"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057187175


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java:
##########
@@ -229,34 +236,90 @@ public CompletableFuture<Void> destroy() {
     }
 
     @Override
-    public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
-        MvPartitionStorage oldPartitionStorage = partitions.get(partitionId);
+    public CompletableFuture<Void> startFullRebalancePartition(int partitionId) {

Review Comment:
   I'll clean it up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057338320


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -537,9 +584,50 @@ public synchronized void clear() {
         gcQueue.clear();
     }
 
-    private void checkClosed() {
+    private void checkStorageClosed() {
         if (closed) {
             throw new StorageClosedException("Storage is already closed");
         }
     }
+
+    private void checkStorageClosedOrInProcessFullRebalance() {
+        checkStorageClosed();
+
+        if (fullRebalance) {
+            throw new StorageFullRebalanceException("Storage in the process of a full rebalancing");
+        }
+    }
+
+    void startFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = true;
+
+        clear();
+
+        lastAppliedIndex = FULL_REBALANCE_IN_PROGRESS;
+        lastAppliedTerm = FULL_REBALANCE_IN_PROGRESS;
+    }
+
+    void abortFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = false;
+
+        clear();
+
+        lastAppliedIndex = 0;
+        lastAppliedTerm = 0;
+    }
+
+    void finishFullRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+        checkStorageClosed();
+
+        assert fullRebalance;

Review Comment:
   Are you proposing to fail aborting a rebalance if it hasn't started yet?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057317729


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java:
##########
@@ -165,43 +167,65 @@ default IndexStorage getOrCreateIndex(int partitionId, UUID indexId) {
     CompletableFuture<Void> destroy();
 
     /**
-     * Prepares the partition storage for rebalancing: makes a backup of the current partition storage and creates a new storage.
-     *
-     * <p>This method must be called before every full rebalance of the partition storage, so that in case of errors or cancellation of the
-     * full rebalance, we can restore the partition storage from the backup.
-     *
-     * <p>Full rebalance will be completed when one of the methods is called:
-     * <ol>
-     *     <li>{@link #abortRebalanceMvPartition(int)} - in case of a full rebalance cancellation or failure, so that we can
-     *     restore the partition storage from a backup;</li>
-     *     <li>{@link #finishRebalanceMvPartition(int)} - in case of a successful full rebalance, to remove the backup of the
-     *     partition storage.</li>
-     * </ol>
+     * Prepares a partition for a full rebalance.
+     * <ul>
+     *     <li>Cleans up the {@link MvPartitionStorage multi-version partition storage} and its associated indexes ({@link HashIndexStorage}
+     *     and {@link SortedIndexStorage});</li>
+     *     <li>Sets {@link MvPartitionStorage#lastAppliedIndex()} and {@link MvPartitionStorage#lastAppliedTerm()} to
+     *     {@link MvPartitionStorage#FULL_REBALANCE_IN_PROGRESS};</li>
+     *     <li>Stops the cursors of a multi-version partition storage and its indexes, subsequent calls to {@link Cursor#hasNext()} and
+     *     {@link Cursor#next()} will throw {@link StorageFullRebalanceException};</li>
+     *     <li>For a multi-version partition storage and its indexes, only write methods will be available, while read and

Review Comment:
   Try to fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057183244


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java:
##########
@@ -165,43 +167,65 @@ default IndexStorage getOrCreateIndex(int partitionId, UUID indexId) {
     CompletableFuture<Void> destroy();
 
     /**
-     * Prepares the partition storage for rebalancing: makes a backup of the current partition storage and creates a new storage.
-     *
-     * <p>This method must be called before every full rebalance of the partition storage, so that in case of errors or cancellation of the
-     * full rebalance, we can restore the partition storage from the backup.
-     *
-     * <p>Full rebalance will be completed when one of the methods is called:
-     * <ol>
-     *     <li>{@link #abortRebalanceMvPartition(int)} - in case of a full rebalance cancellation or failure, so that we can
-     *     restore the partition storage from a backup;</li>
-     *     <li>{@link #finishRebalanceMvPartition(int)} - in case of a successful full rebalance, to remove the backup of the
-     *     partition storage.</li>
-     * </ol>
+     * Prepares a partition for a full rebalance.
+     * <ul>
+     *     <li>Cleans up the {@link MvPartitionStorage multi-version partition storage} and its associated indexes ({@link HashIndexStorage}
+     *     and {@link SortedIndexStorage});</li>
+     *     <li>Sets {@link MvPartitionStorage#lastAppliedIndex()} and {@link MvPartitionStorage#lastAppliedTerm()} to
+     *     {@link MvPartitionStorage#FULL_REBALANCE_IN_PROGRESS};</li>
+     *     <li>Stops the cursors of a multi-version partition storage and its indexes, subsequent calls to {@link Cursor#hasNext()} and
+     *     {@link Cursor#next()} will throw {@link StorageFullRebalanceException};</li>
+     *     <li>For a multi-version partition storage and its indexes, only write methods will be available, while read and

Review Comment:
   Why not?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057186915


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -537,9 +584,50 @@ public synchronized void clear() {
         gcQueue.clear();
     }
 
-    private void checkClosed() {
+    private void checkStorageClosed() {
         if (closed) {
             throw new StorageClosedException("Storage is already closed");
         }
     }
+
+    private void checkStorageClosedOrInProcessFullRebalance() {
+        checkStorageClosed();
+
+        if (fullRebalance) {
+            throw new StorageFullRebalanceException("Storage in the process of a full rebalancing");
+        }
+    }
+
+    void startFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = true;
+
+        clear();
+
+        lastAppliedIndex = FULL_REBALANCE_IN_PROGRESS;
+        lastAppliedTerm = FULL_REBALANCE_IN_PROGRESS;
+    }
+
+    void abortFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = false;
+
+        clear();
+
+        lastAppliedIndex = 0;
+        lastAppliedTerm = 0;
+    }
+
+    void finishFullRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+        checkStorageClosed();
+
+        assert fullRebalance;

Review Comment:
   Does that make sense?
   I think that if we did not have time to start the rebalance, then there should not be an error when canceling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057242367


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -378,19 +381,59 @@ private static ReadResult walkVersionChain(VersionChain chainHead, HybridTimesta
 
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessFullRebalance();
+
+        return new Cursor<>() {

Review Comment:
   It would, method would become much smaller. And, generally speaking, smaller methods are better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057185264


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getOutConfigRangePartitionId()));
 
-        partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+        IndexRow indexRow = indexRow(binaryRow, rowId);
 
-            partitionStorage.lastApplied(100, 10);
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
+
+            hashIndexStorage.put(indexRow);
 
-            partitionStorage.committedGroupConfiguration(new RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
+            sortedIndexStorage.put(indexRow);
 
             return null;
         });
 
-        partitionStorage.flush().get(1, TimeUnit.SECONDS);
+        Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rowId);
+        PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
+
+        Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(indexRow.indexColumns());
+
+        Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        MvPartitionStorage newPartitionStorage0 = tableStorage.getMvPartition(PARTITION_ID);
+        // Let's check that we won't get destroyed storages.
+        assertNull(tableStorage.getMvPartition(PARTITION_ID));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
 
-        assertNotNull(newPartitionStorage0);
-        assertNotSame(partitionStorage, newPartitionStorage0);
+        checkStorageDestroyed(mvPartitionStorage);
+        checkStorageDestroyed(hashIndexStorage);
+        checkStorageDestroyed(sortedIndexStorage);
 
-        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
-        assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
-        assertNull(newPartitionStorage0.committedGroupConfiguration());
-        assertEquals(0L, newPartitionStorage0.persistedIndex());
-        assertEquals(0, newPartitionStorage0.rowsCount());
+        assertThrows(StorageClosedException.class, () -> getAll(scanVersionsCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanTimestampCursor));
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromHashIndexCursor));
 
-        MvPartitionStorage newPartitionStorage1 = tableStorage.getMvPartition(PARTITION_ID);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromSortedIndexCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanFromSortedIndexCursor));
 
-        assertSame(newPartitionStorage0, newPartitionStorage1);
+        // Let's check that nothing will happen if we try to destroy a non-existing partition.
+        assertDoesNotThrow(() -> tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS));

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057183638


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getOutConfigRangePartitionId()));

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057191221


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getOutConfigRangePartitionId()));
 
-        partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+        IndexRow indexRow = indexRow(binaryRow, rowId);
 
-            partitionStorage.lastApplied(100, 10);
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
+
+            hashIndexStorage.put(indexRow);
 
-            partitionStorage.committedGroupConfiguration(new RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
+            sortedIndexStorage.put(indexRow);
 
             return null;
         });
 
-        partitionStorage.flush().get(1, TimeUnit.SECONDS);
+        Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rowId);
+        PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
+
+        Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(indexRow.indexColumns());
+
+        Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        MvPartitionStorage newPartitionStorage0 = tableStorage.getMvPartition(PARTITION_ID);
+        // Let's check that we won't get destroyed storages.
+        assertNull(tableStorage.getMvPartition(PARTITION_ID));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
 
-        assertNotNull(newPartitionStorage0);
-        assertNotSame(partitionStorage, newPartitionStorage0);
+        checkStorageDestroyed(mvPartitionStorage);
+        checkStorageDestroyed(hashIndexStorage);
+        checkStorageDestroyed(sortedIndexStorage);
 
-        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
-        assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
-        assertNull(newPartitionStorage0.committedGroupConfiguration());
-        assertEquals(0L, newPartitionStorage0.persistedIndex());
-        assertEquals(0, newPartitionStorage0.rowsCount());
+        assertThrows(StorageClosedException.class, () -> getAll(scanVersionsCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanTimestampCursor));
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromHashIndexCursor));
 
-        MvPartitionStorage newPartitionStorage1 = tableStorage.getMvPartition(PARTITION_ID);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromSortedIndexCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanFromSortedIndexCursor));
 
-        assertSame(newPartitionStorage0, newPartitionStorage1);
+        // Let's check that nothing will happen if we try to destroy a non-existing partition.
+        assertDoesNotThrow(() -> tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS));
     }
 
     @Test
-    public void testAbortRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    public void testReCreatePartition() throws Exception {
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
 
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+            return null;
+        });
 
-        tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        assertSame(partitionStorage, tableStorage.getMvPartition(PARTITION_ID));
+        MvPartitionStorage newMvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
 
-        assertDoesNotThrow(() -> tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+        assertThat(getAll(newMvPartitionStorage.scanVersions(rowId)), empty());
     }
 
     @Test
-    public void testFinishRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    public void testSuccessFullRebalance() throws Exception {

Review Comment:
   I'll try to simplify.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill merged pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill merged PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057483053


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -537,9 +584,50 @@ public synchronized void clear() {
         gcQueue.clear();
     }
 
-    private void checkClosed() {
+    private void checkStorageClosed() {
         if (closed) {
             throw new StorageClosedException("Storage is already closed");
         }
     }
+
+    private void checkStorageClosedOrInProcessFullRebalance() {
+        checkStorageClosed();
+
+        if (fullRebalance) {
+            throw new StorageFullRebalanceException("Storage in the process of a full rebalancing");
+        }
+    }
+
+    void startFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = true;
+
+        clear();
+
+        lastAppliedIndex = FULL_REBALANCE_IN_PROGRESS;
+        lastAppliedTerm = FULL_REBALANCE_IN_PROGRESS;
+    }
+
+    void abortFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = false;
+
+        clear();
+
+        lastAppliedIndex = 0;
+        lastAppliedTerm = 0;
+    }
+
+    void finishFullRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+        checkStorageClosed();
+
+        assert fullRebalance;

Review Comment:
   No, I propose asserting that we don't abort the rebalance that's doesn't exist



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057185707


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -378,19 +381,59 @@ private static ReadResult walkVersionChain(VersionChain chainHead, HybridTimesta
 
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessFullRebalance();
+
+        return new Cursor<>() {
+            @Nullable
+            private Boolean hasNext;
+
+            @Nullable
+            private VersionChain versionChain;
+
+            @Override
+            public void close() {
+                // No-op.
+            }
+
+            @Override
+            public boolean hasNext() {
+                checkStorageClosedOrInProcessFullRebalance();
 
-        return Cursor.fromBareIterator(
-                Stream.iterate(map.get(rowId), Objects::nonNull, vc -> vc.next)
-                        .peek(versionChain -> checkClosed())
-                        .map((VersionChain versionChain) -> versionChainToReadResult(versionChain, false))
-                        .iterator()
-        );
+                advanceIfNeeded();
+
+                return hasNext;
+            }
+
+            @Override
+            public ReadResult next() {
+                checkStorageClosedOrInProcessFullRebalance();

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057191014


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java:
##########
@@ -110,6 +113,10 @@ public MvPartitionStorage getMvPartition(int partitionId) {
     public CompletableFuture<Void> destroyPartition(int partitionId) {
         checkPartitionId(partitionId);
 
+        if (fullRebalanceFutureByPartitionId.containsKey(partitionId)) {
+            throw new StorageException("Partition in the process of full rebalancing: " + partitionId);

Review Comment:
   It turned out funny that you and I discussed this for `TxStateStorage`, but I didn’t use it here, I’ll correct it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057323927


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java:
##########
@@ -110,6 +113,10 @@ public MvPartitionStorage getMvPartition(int partitionId) {
     public CompletableFuture<Void> destroyPartition(int partitionId) {
         checkPartitionId(partitionId);
 
+        if (fullRebalanceFutureByPartitionId.containsKey(partitionId)) {
+            throw new StorageException("Partition in the process of full rebalancing: " + partitionId);

Review Comment:
   Added some documentation and tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1056289362


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java:
##########
@@ -165,43 +167,65 @@ default IndexStorage getOrCreateIndex(int partitionId, UUID indexId) {
     CompletableFuture<Void> destroy();
 
     /**
-     * Prepares the partition storage for rebalancing: makes a backup of the current partition storage and creates a new storage.
-     *
-     * <p>This method must be called before every full rebalance of the partition storage, so that in case of errors or cancellation of the
-     * full rebalance, we can restore the partition storage from the backup.
-     *
-     * <p>Full rebalance will be completed when one of the methods is called:
-     * <ol>
-     *     <li>{@link #abortRebalanceMvPartition(int)} - in case of a full rebalance cancellation or failure, so that we can
-     *     restore the partition storage from a backup;</li>
-     *     <li>{@link #finishRebalanceMvPartition(int)} - in case of a successful full rebalance, to remove the backup of the
-     *     partition storage.</li>
-     * </ol>
+     * Prepares a partition for a full rebalance.
+     * <ul>
+     *     <li>Cleans up the {@link MvPartitionStorage multi-version partition storage} and its associated indexes ({@link HashIndexStorage}
+     *     and {@link SortedIndexStorage});</li>
+     *     <li>Sets {@link MvPartitionStorage#lastAppliedIndex()} and {@link MvPartitionStorage#lastAppliedTerm()} to
+     *     {@link MvPartitionStorage#FULL_REBALANCE_IN_PROGRESS};</li>
+     *     <li>Stops the cursors of a multi-version partition storage and its indexes, subsequent calls to {@link Cursor#hasNext()} and
+     *     {@link Cursor#next()} will throw {@link StorageFullRebalanceException};</li>
+     *     <li>For a multi-version partition storage and its indexes, only write methods will be available, while read and

Review Comment:
   Do all write methods need to be functional? I assume there are only two of them that are actually useful



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getOutConfigRangePartitionId()));

Review Comment:
   `getOutConfigRangePartitionId` is a very confusing name. Maybe you can rename it to something like`getPartitionIdOutOfRange`?
   My problem with it is that usually you view the name of the method as "verb + object + something at the end", and "get out" just feels like an inappropriately (and incorrectly) used phrasal verb. I blame that on words ordering.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -378,19 +381,59 @@ private static ReadResult walkVersionChain(VersionChain chainHead, HybridTimesta
 
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessFullRebalance();
+
+        return new Cursor<>() {
+            @Nullable
+            private Boolean hasNext;
+
+            @Nullable
+            private VersionChain versionChain;
+
+            @Override
+            public void close() {
+                // No-op.
+            }
+
+            @Override
+            public boolean hasNext() {
+                checkStorageClosedOrInProcessFullRebalance();
 
-        return Cursor.fromBareIterator(
-                Stream.iterate(map.get(rowId), Objects::nonNull, vc -> vc.next)
-                        .peek(versionChain -> checkClosed())
-                        .map((VersionChain versionChain) -> versionChainToReadResult(versionChain, false))
-                        .iterator()
-        );
+                advanceIfNeeded();
+
+                return hasNext;
+            }
+
+            @Override
+            public ReadResult next() {
+                checkStorageClosedOrInProcessFullRebalance();

Review Comment:
   Why don't you move this call inside of `advanceIfNeeded()`?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getOutConfigRangePartitionId()));
 
-        partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+        IndexRow indexRow = indexRow(binaryRow, rowId);
 
-            partitionStorage.lastApplied(100, 10);
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
+
+            hashIndexStorage.put(indexRow);
 
-            partitionStorage.committedGroupConfiguration(new RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
+            sortedIndexStorage.put(indexRow);
 
             return null;
         });
 
-        partitionStorage.flush().get(1, TimeUnit.SECONDS);
+        Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rowId);
+        PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
+
+        Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(indexRow.indexColumns());
+
+        Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        MvPartitionStorage newPartitionStorage0 = tableStorage.getMvPartition(PARTITION_ID);
+        // Let's check that we won't get destroyed storages.
+        assertNull(tableStorage.getMvPartition(PARTITION_ID));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
 
-        assertNotNull(newPartitionStorage0);
-        assertNotSame(partitionStorage, newPartitionStorage0);
+        checkStorageDestroyed(mvPartitionStorage);
+        checkStorageDestroyed(hashIndexStorage);
+        checkStorageDestroyed(sortedIndexStorage);
 
-        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
-        assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
-        assertNull(newPartitionStorage0.committedGroupConfiguration());
-        assertEquals(0L, newPartitionStorage0.persistedIndex());
-        assertEquals(0, newPartitionStorage0.rowsCount());
+        assertThrows(StorageClosedException.class, () -> getAll(scanVersionsCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanTimestampCursor));
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromHashIndexCursor));
 
-        MvPartitionStorage newPartitionStorage1 = tableStorage.getMvPartition(PARTITION_ID);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromSortedIndexCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanFromSortedIndexCursor));
 
-        assertSame(newPartitionStorage0, newPartitionStorage1);
+        // Let's check that nothing will happen if we try to destroy a non-existing partition.
+        assertDoesNotThrow(() -> tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS));
     }
 
     @Test
-    public void testAbortRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    public void testReCreatePartition() throws Exception {
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
 
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+            return null;
+        });
 
-        tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        assertSame(partitionStorage, tableStorage.getMvPartition(PARTITION_ID));
+        MvPartitionStorage newMvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
 
-        assertDoesNotThrow(() -> tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+        assertThat(getAll(newMvPartitionStorage.scanVersions(rowId)), empty());
     }
 
     @Test
-    public void testFinishRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    public void testSuccessFullRebalance() throws Exception {

Review Comment:
   Damn, this test is long. Is there a way to simplify it? It's too much, I won't even try to follow it



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -537,9 +584,50 @@ public synchronized void clear() {
         gcQueue.clear();
     }
 
-    private void checkClosed() {
+    private void checkStorageClosed() {
         if (closed) {
             throw new StorageClosedException("Storage is already closed");
         }
     }
+
+    private void checkStorageClosedOrInProcessFullRebalance() {
+        checkStorageClosed();
+
+        if (fullRebalance) {
+            throw new StorageFullRebalanceException("Storage in the process of a full rebalancing");
+        }
+    }
+
+    void startFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = true;
+
+        clear();
+
+        lastAppliedIndex = FULL_REBALANCE_IN_PROGRESS;
+        lastAppliedTerm = FULL_REBALANCE_IN_PROGRESS;
+    }
+
+    void abortFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = false;
+
+        clear();
+
+        lastAppliedIndex = 0;
+        lastAppliedTerm = 0;
+    }
+
+    void finishFullRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+        checkStorageClosed();
+
+        assert fullRebalance;

Review Comment:
   I see no such assertion in "abort" method, why?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getOutConfigRangePartitionId()));
 
-        partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+        IndexRow indexRow = indexRow(binaryRow, rowId);
 
-            partitionStorage.lastApplied(100, 10);
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
+
+            hashIndexStorage.put(indexRow);
 
-            partitionStorage.committedGroupConfiguration(new RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
+            sortedIndexStorage.put(indexRow);
 
             return null;
         });
 
-        partitionStorage.flush().get(1, TimeUnit.SECONDS);
+        Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rowId);
+        PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
+
+        Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(indexRow.indexColumns());
+
+        Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        MvPartitionStorage newPartitionStorage0 = tableStorage.getMvPartition(PARTITION_ID);
+        // Let's check that we won't get destroyed storages.
+        assertNull(tableStorage.getMvPartition(PARTITION_ID));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
 
-        assertNotNull(newPartitionStorage0);
-        assertNotSame(partitionStorage, newPartitionStorage0);
+        checkStorageDestroyed(mvPartitionStorage);
+        checkStorageDestroyed(hashIndexStorage);
+        checkStorageDestroyed(sortedIndexStorage);
 
-        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
-        assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
-        assertNull(newPartitionStorage0.committedGroupConfiguration());
-        assertEquals(0L, newPartitionStorage0.persistedIndex());
-        assertEquals(0, newPartitionStorage0.rowsCount());
+        assertThrows(StorageClosedException.class, () -> getAll(scanVersionsCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanTimestampCursor));
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromHashIndexCursor));
 
-        MvPartitionStorage newPartitionStorage1 = tableStorage.getMvPartition(PARTITION_ID);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromSortedIndexCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanFromSortedIndexCursor));
 
-        assertSame(newPartitionStorage0, newPartitionStorage1);
+        // Let's check that nothing will happen if we try to destroy a non-existing partition.
+        assertDoesNotThrow(() -> tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS));

Review Comment:
   Well, how about using `assertThat(tableStorage.destroyPartition(PARTITION_ID), willCompleteSuccessfully());`? We already have this thing in our test framework.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -561,4 +800,61 @@ private void checkStorageDestroyed(IndexStorage storage) {
 
         assertThrows(StorageClosedException.class, () -> storage.remove(indexRow));
     }
+
+    private int getOutConfigRangePartitionId() {

Review Comment:
   Same comment about the name



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -464,13 +507,15 @@ public ReadResult next() {
 
     @Override
     public @Nullable RowId closestRowId(RowId lowerBound) throws StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessFullRebalance();

Review Comment:
   Again, word usage raises some questions. Maybe "in process **of** full reblance"



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java:
##########
@@ -229,34 +236,90 @@ public CompletableFuture<Void> destroy() {
     }
 
     @Override
-    public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
-        MvPartitionStorage oldPartitionStorage = partitions.get(partitionId);
+    public CompletableFuture<Void> startFullRebalancePartition(int partitionId) {

Review Comment:
   Why do we need to add "full" everywhere? As if we have a dedicated process for non-full rebalance, that is known to the storage? No, we don't have it



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getOutConfigRangePartitionId()));
 
-        partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+        IndexRow indexRow = indexRow(binaryRow, rowId);
 
-            partitionStorage.lastApplied(100, 10);
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
+
+            hashIndexStorage.put(indexRow);
 
-            partitionStorage.committedGroupConfiguration(new RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
+            sortedIndexStorage.put(indexRow);
 
             return null;
         });
 
-        partitionStorage.flush().get(1, TimeUnit.SECONDS);
+        Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rowId);
+        PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
+
+        Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(indexRow.indexColumns());
+
+        Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        MvPartitionStorage newPartitionStorage0 = tableStorage.getMvPartition(PARTITION_ID);
+        // Let's check that we won't get destroyed storages.
+        assertNull(tableStorage.getMvPartition(PARTITION_ID));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
 
-        assertNotNull(newPartitionStorage0);
-        assertNotSame(partitionStorage, newPartitionStorage0);
+        checkStorageDestroyed(mvPartitionStorage);
+        checkStorageDestroyed(hashIndexStorage);
+        checkStorageDestroyed(sortedIndexStorage);
 
-        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
-        assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
-        assertNull(newPartitionStorage0.committedGroupConfiguration());
-        assertEquals(0L, newPartitionStorage0.persistedIndex());
-        assertEquals(0, newPartitionStorage0.rowsCount());
+        assertThrows(StorageClosedException.class, () -> getAll(scanVersionsCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanTimestampCursor));
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromHashIndexCursor));
 
-        MvPartitionStorage newPartitionStorage1 = tableStorage.getMvPartition(PARTITION_ID);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromSortedIndexCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanFromSortedIndexCursor));
 
-        assertSame(newPartitionStorage0, newPartitionStorage1);
+        // Let's check that nothing will happen if we try to destroy a non-existing partition.
+        assertDoesNotThrow(() -> tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS));
     }
 
     @Test
-    public void testAbortRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    public void testReCreatePartition() throws Exception {

Review Comment:
   I would add an explicit assumption here that the storage is not volatile, otherwise it makes no sense.
   (my that I mean `assumeTrue` or `assumeThat` methods)



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java:
##########
@@ -315,4 +314,51 @@ private RowId getRowId(@Nullable Entry<RowId, ?> rowIdEntry) {
             return rowIdEntry == null ? null : rowIdEntry.getKey();
         }
     }
+
+    private void checkStorageClosed() {
+        if (closed) {
+            throw new StorageClosedException("Storage is already closed");
+        }
+    }
+
+    private void checkStorageClosedOrInProcessOfRebalance() {
+        checkStorageClosed();
+
+        if (fullRebalance) {
+            throw new StorageFullRebalanceException("Storage in the process of a full rebalancing");
+        }
+    }
+
+    /**
+     * Starts a full rebalancing for the storage.
+     */
+    public void startFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = true;
+
+        clear();
+    }
+
+    /**
+     * Aborts a full rebalance of the storage.
+     */
+    public void abortFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = false;
+
+        clear();
+    }
+
+    /**
+     * Completes a full rebalance of the storage.
+     */
+    public void finishFullRebalance() {
+        checkStorageClosed();
+
+        assert fullRebalance;

Review Comment:
   Same question here



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java:
##########
@@ -110,6 +113,10 @@ public MvPartitionStorage getMvPartition(int partitionId) {
     public CompletableFuture<Void> destroyPartition(int partitionId) {
         checkPartitionId(partitionId);
 
+        if (fullRebalanceFutureByPartitionId.containsKey(partitionId)) {
+            throw new StorageException("Partition in the process of full rebalancing: " + partitionId);

Review Comment:
   Wait, I thought that we discussed it. How is it possible that there's a rebalance during destruction? This should not happen.
   Also, where's the documentation of this behavior?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -378,19 +381,59 @@ private static ReadResult walkVersionChain(VersionChain chainHead, HybridTimesta
 
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessFullRebalance();
+
+        return new Cursor<>() {

Review Comment:
   Can it be an inner class, not anonymous?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057184825


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getOutConfigRangePartitionId()));
 
-        partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+        IndexRow indexRow = indexRow(binaryRow, rowId);
 
-            partitionStorage.lastApplied(100, 10);
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
+
+            hashIndexStorage.put(indexRow);
 
-            partitionStorage.committedGroupConfiguration(new RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
+            sortedIndexStorage.put(indexRow);
 
             return null;
         });
 
-        partitionStorage.flush().get(1, TimeUnit.SECONDS);
+        Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rowId);
+        PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
+
+        Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(indexRow.indexColumns());
+
+        Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        MvPartitionStorage newPartitionStorage0 = tableStorage.getMvPartition(PARTITION_ID);
+        // Let's check that we won't get destroyed storages.
+        assertNull(tableStorage.getMvPartition(PARTITION_ID));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
 
-        assertNotNull(newPartitionStorage0);
-        assertNotSame(partitionStorage, newPartitionStorage0);
+        checkStorageDestroyed(mvPartitionStorage);
+        checkStorageDestroyed(hashIndexStorage);
+        checkStorageDestroyed(sortedIndexStorage);
 
-        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
-        assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
-        assertNull(newPartitionStorage0.committedGroupConfiguration());
-        assertEquals(0L, newPartitionStorage0.persistedIndex());
-        assertEquals(0, newPartitionStorage0.rowsCount());
+        assertThrows(StorageClosedException.class, () -> getAll(scanVersionsCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanTimestampCursor));
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromHashIndexCursor));
 
-        MvPartitionStorage newPartitionStorage1 = tableStorage.getMvPartition(PARTITION_ID);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromSortedIndexCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanFromSortedIndexCursor));
 
-        assertSame(newPartitionStorage0, newPartitionStorage1);
+        // Let's check that nothing will happen if we try to destroy a non-existing partition.
+        assertDoesNotThrow(() -> tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS));
     }
 
     @Test
-    public void testAbortRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    public void testReCreatePartition() throws Exception {

Review Comment:
   It makes sense, since we ourselves can make a mistake (not destroy the storage or forget to throw it out of some structure) for a volatile storage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057186347


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -464,13 +507,15 @@ public ReadResult next() {
 
     @Override
     public @Nullable RowId closestRowId(RowId lowerBound) throws StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessFullRebalance();

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1056294380


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getOutConfigRangePartitionId()));
 
-        partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+        MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
+
+        IndexRow indexRow = indexRow(binaryRow, rowId);
 
-            partitionStorage.lastApplied(100, 10);
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
+
+            hashIndexStorage.put(indexRow);
 
-            partitionStorage.committedGroupConfiguration(new RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
+            sortedIndexStorage.put(indexRow);
 
             return null;
         });
 
-        partitionStorage.flush().get(1, TimeUnit.SECONDS);
+        Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.scanVersions(rowId);
+        PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
+
+        Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(indexRow.indexColumns());
+
+        Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        MvPartitionStorage newPartitionStorage0 = tableStorage.getMvPartition(PARTITION_ID);
+        // Let's check that we won't get destroyed storages.
+        assertNull(tableStorage.getMvPartition(PARTITION_ID));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
 
-        assertNotNull(newPartitionStorage0);
-        assertNotSame(partitionStorage, newPartitionStorage0);
+        checkStorageDestroyed(mvPartitionStorage);
+        checkStorageDestroyed(hashIndexStorage);
+        checkStorageDestroyed(sortedIndexStorage);
 
-        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
-        assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
-        assertNull(newPartitionStorage0.committedGroupConfiguration());
-        assertEquals(0L, newPartitionStorage0.persistedIndex());
-        assertEquals(0, newPartitionStorage0.rowsCount());
+        assertThrows(StorageClosedException.class, () -> getAll(scanVersionsCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanTimestampCursor));
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromHashIndexCursor));
 
-        MvPartitionStorage newPartitionStorage1 = tableStorage.getMvPartition(PARTITION_ID);
+        assertThrows(StorageClosedException.class, () -> getAll(getFromSortedIndexCursor));
+        assertThrows(StorageClosedException.class, () -> getAll(scanFromSortedIndexCursor));
 
-        assertSame(newPartitionStorage0, newPartitionStorage1);
+        // Let's check that nothing will happen if we try to destroy a non-existing partition.
+        assertDoesNotThrow(() -> tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS));
     }
 
     @Test
-    public void testAbortRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    public void testReCreatePartition() throws Exception {

Review Comment:
   I would add an explicit assumption here that the storage is not volatile, otherwise it makes no sense.
   (by that I mean `assumeTrue` or `assumeThat` methods)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057241671


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java:
##########
@@ -165,43 +167,65 @@ default IndexStorage getOrCreateIndex(int partitionId, UUID indexId) {
     CompletableFuture<Void> destroy();
 
     /**
-     * Prepares the partition storage for rebalancing: makes a backup of the current partition storage and creates a new storage.
-     *
-     * <p>This method must be called before every full rebalance of the partition storage, so that in case of errors or cancellation of the
-     * full rebalance, we can restore the partition storage from the backup.
-     *
-     * <p>Full rebalance will be completed when one of the methods is called:
-     * <ol>
-     *     <li>{@link #abortRebalanceMvPartition(int)} - in case of a full rebalance cancellation or failure, so that we can
-     *     restore the partition storage from a backup;</li>
-     *     <li>{@link #finishRebalanceMvPartition(int)} - in case of a successful full rebalance, to remove the backup of the
-     *     partition storage.</li>
-     * </ol>
+     * Prepares a partition for a full rebalance.
+     * <ul>
+     *     <li>Cleans up the {@link MvPartitionStorage multi-version partition storage} and its associated indexes ({@link HashIndexStorage}
+     *     and {@link SortedIndexStorage});</li>
+     *     <li>Sets {@link MvPartitionStorage#lastAppliedIndex()} and {@link MvPartitionStorage#lastAppliedTerm()} to
+     *     {@link MvPartitionStorage#FULL_REBALANCE_IN_PROGRESS};</li>
+     *     <li>Stops the cursors of a multi-version partition storage and its indexes, subsequent calls to {@link Cursor#hasNext()} and
+     *     {@link Cursor#next()} will throw {@link StorageFullRebalanceException};</li>
+     *     <li>For a multi-version partition storage and its indexes, only write methods will be available, while read and

Review Comment:
   Because of a real possibility to shoot yourself in a foot. The more restrictive we are, the better. I don't trust people that call storage API :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057242784


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -537,9 +584,50 @@ public synchronized void clear() {
         gcQueue.clear();
     }
 
-    private void checkClosed() {
+    private void checkStorageClosed() {
         if (closed) {
             throw new StorageClosedException("Storage is already closed");
         }
     }
+
+    private void checkStorageClosedOrInProcessFullRebalance() {
+        checkStorageClosed();
+
+        if (fullRebalance) {
+            throw new StorageFullRebalanceException("Storage in the process of a full rebalancing");
+        }
+    }
+
+    void startFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = true;
+
+        clear();
+
+        lastAppliedIndex = FULL_REBALANCE_IN_PROGRESS;
+        lastAppliedTerm = FULL_REBALANCE_IN_PROGRESS;
+    }
+
+    void abortFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = false;
+
+        clear();
+
+        lastAppliedIndex = 0;
+        lastAppliedTerm = 0;
+    }
+
+    void finishFullRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+        checkStorageClosed();
+
+        assert fullRebalance;

Review Comment:
   Do you mean that we only set this flag when we completed the "start full rebalance" future, not _created_ it? This is counter-intuitive to me, we should be able to check for concurrent rebalance _before_ data deletion has started.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1471: IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and indexes

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1057505651


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -537,9 +584,50 @@ public synchronized void clear() {
         gcQueue.clear();
     }
 
-    private void checkClosed() {
+    private void checkStorageClosed() {
         if (closed) {
             throw new StorageClosedException("Storage is already closed");
         }
     }
+
+    private void checkStorageClosedOrInProcessFullRebalance() {
+        checkStorageClosed();
+
+        if (fullRebalance) {
+            throw new StorageFullRebalanceException("Storage in the process of a full rebalancing");
+        }
+    }
+
+    void startFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = true;
+
+        clear();
+
+        lastAppliedIndex = FULL_REBALANCE_IN_PROGRESS;
+        lastAppliedTerm = FULL_REBALANCE_IN_PROGRESS;
+    }
+
+    void abortFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = false;
+
+        clear();
+
+        lastAppliedIndex = 0;
+        lastAppliedTerm = 0;
+    }
+
+    void finishFullRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+        checkStorageClosed();
+
+        assert fullRebalance;

Review Comment:
   Discussed in person.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java:
##########
@@ -110,6 +113,10 @@ public MvPartitionStorage getMvPartition(int partitionId) {
     public CompletableFuture<Void> destroyPartition(int partitionId) {
         checkPartitionId(partitionId);
 
+        if (fullRebalanceFutureByPartitionId.containsKey(partitionId)) {
+            throw new StorageException("Partition in the process of full rebalancing: " + partitionId);

Review Comment:
   Discussed in person.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org