You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/01/27 07:11:31 UTC
[ignite-3] branch main updated: IGNITE-18603 Clear all storages of a partition if one of the storages did not have time to rebalance (#1578)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ba76fcf2da IGNITE-18603 Clear all storages of a partition if one of the storages did not have time to rebalance (#1578)
ba76fcf2da is described below
commit ba76fcf2da5299e79a8e5204e73a57b4616d7656
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Fri Jan 27 10:11:25 2023 +0300
IGNITE-18603 Clear all storages of a partition if one of the storages did not have time to rebalance (#1578)
---
.../internal/storage/engine/MvTableStorage.java | 26 +++++
.../ignite/internal/storage/util/StorageState.java | 5 +-
.../ignite/internal/storage/util/StorageUtils.java | 51 ++++++++++
.../storage/AbstractMvTableStorageTest.java | 88 ++++++++++++++---
.../storage/impl/TestMvPartitionStorage.java | 16 ++-
.../internal/storage/impl/TestMvTableStorage.java | 18 ++++
.../storage/index/impl/TestHashIndexStorage.java | 12 ++-
.../storage/index/impl/TestSortedIndexStorage.java | 12 ++-
.../pagememory/AbstractPageMemoryTableStorage.java | 29 ++++++
.../PersistentPageMemoryTableStorage.java | 32 +-----
.../pagememory/VolatilePageMemoryTableStorage.java | 2 +-
.../index/hash/PageMemoryHashIndexStorage.java | 37 ++++++-
.../index/sorted/PageMemorySortedIndexStorage.java | 37 ++++++-
.../mv/AbstractPageMemoryMvPartitionStorage.java | 90 +++++++++++++----
.../mv/PersistentPageMemoryMvPartitionStorage.java | 52 +++++++---
.../mv/VolatilePageMemoryMvPartitionStorage.java | 63 ++++++++----
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 50 ++++++++--
.../storage/rocksdb/RocksDbTableStorage.java | 58 ++++++++++-
.../rocksdb/index/RocksDbHashIndexStorage.java | 25 +++++
.../rocksdb/index/RocksDbSortedIndexStorage.java | 25 +++++
.../storage/rocksdb/RocksDbMvTableStorageTest.java | 7 --
.../internal/table/distributed/TableManager.java | 41 +++++---
.../internal/tx/storage/state/TxStateStorage.java | 18 ++++
.../state/rocksdb/TxStateRocksDbStorage.java | 109 ++++++++++++++-------
.../state/rocksdb/RocksDbTxStateStorageTest.java | 23 ++---
.../storage/state/AbstractTxStateStorageTest.java | 79 ++++++++++++---
.../tx/storage/state/test/TestTxStateStorage.java | 16 +++
27 files changed, 803 insertions(+), 218 deletions(-)
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index 6e4619759d..71fae079d4 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.schema.configuration.index.TableIndexConfigura
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
@@ -257,4 +258,29 @@ public interface MvTableStorage extends ManuallyCloseable {
long lastAppliedTerm,
RaftGroupConfiguration raftGroupConfig
);
+
+ /**
+ * Clears a partition and all associated indices. After the cleaning is completed, a partition and all associated indices will be fully
+ * available.
+ * <ul>
+ * <li>Cancels all current operations (including cursors) of a {@link MvPartitionStorage multi-version partition storage} and its
+ * associated indexes ({@link HashIndexStorage} and {@link SortedIndexStorage}) and waits for their completion;</li>
+ * <li>Does not allow operations on a multi-version partition storage and its indexes to be performed (exceptions will be thrown)
+ * until the cleaning is completed;</li>
+ * <li>Clears a multi-version partition storage and its indexes;</li>
+ * <li>Sets {@link MvPartitionStorage#lastAppliedIndex()}, {@link MvPartitionStorage#lastAppliedTerm()},
+ * {@link MvPartitionStorage#persistedIndex()} to {@code 0} and {@link MvPartitionStorage#committedGroupConfiguration()} to
+ * {@code null};</li>
+ * <li>Once cleanup a multi-version partition storage and its indexes is complete (success or error), allows to perform all with a
+ * multi-version partition storage and its indexes.</li>
+ * </ul>
+ *
+ * @return Future of cleanup of a multi-version partition storage and its indexes.
+ * @throws IllegalArgumentException If Partition ID is out of bounds.
+ * @throws StorageClosedException If a multi-version partition storage and its indexes is already closed or destroyed.
+ * @throws StorageRebalanceException If a multi-version partition storage and its indexes are in process of rebalance.
+ * @throws StorageException StorageException If a multi-version partition storage and its indexes are in progress of cleanup or failed
+ * for another reason.
+ */
+ CompletableFuture<Void> clearPartition(int partitionId);
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageState.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageState.java
index 711df878cf..8d22252765 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageState.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageState.java
@@ -28,5 +28,8 @@ public enum StorageState {
CLOSED,
/** Storage is in the process of rebalancing. */
- REBALANCE
+ REBALANCE,
+
+ /** Storage is in the process of cleanup. */
+ CLEANUP
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
index 9034d5567d..cf67fbf12a 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
@@ -36,6 +36,49 @@ public class StorageUtils {
return "Partition ID " + partitionId + " does not exist";
}
+ /**
+ * Throws an exception if the state of the storage is not {@link StorageState#RUNNABLE}.
+ *
+ * @param state Storage state.
+ * @param storageInfoSupplier Storage information supplier, for example in the format "table=user, partitionId=1".
+ * @throws StorageClosedException If the storage is closed.
+ * @throws StorageRebalanceException If storage is in the process of rebalancing.
+ * @throws StorageException For other {@link StorageState}.
+ */
+ public static void throwExceptionIfStorageNotInRunnableState(StorageState state, Supplier<String> storageInfoSupplier) {
+ if (state != StorageState.RUNNABLE) {
+ throwExceptionDependingOnStorageState(state, storageInfoSupplier.get());
+ }
+ }
+
+ /**
+ * Throws an exception if the state of the storage is not {@link StorageState#RUNNABLE} OR {@link StorageState#REBALANCE}.
+ *
+ * @param state Storage state.
+ * @param storageInfoSupplier Storage information supplier, for example in the format "table=user, partitionId=1".
+ * @throws StorageClosedException If the storage is closed.
+ * @throws StorageException For other {@link StorageState}.
+ */
+ public static void throwExceptionIfStorageNotInRunnableOrRebalanceState(StorageState state, Supplier<String> storageInfoSupplier) {
+ if (state != StorageState.RUNNABLE && state != StorageState.REBALANCE) {
+ throwExceptionDependingOnStorageState(state, storageInfoSupplier.get());
+ }
+ }
+
+ /**
+ * Throws an exception if the state of the storage is not {@link StorageState#CLEANUP} OR {@link StorageState#REBALANCE}.
+ *
+ * @param state Storage state.
+ * @param storageInfoSupplier Storage information supplier, for example in the format "table=user, partitionId=1".
+ * @throws StorageClosedException If the storage is closed.
+ * @throws StorageException For other {@link StorageState}.
+ */
+ public static void throwExceptionIfStorageNotInCleanupOrRebalancedState(StorageState state, Supplier<String> storageInfoSupplier) {
+ if (state != StorageState.CLEANUP && state != StorageState.REBALANCE) {
+ throwExceptionDependingOnStorageState(state, storageInfoSupplier.get());
+ }
+ }
+
/**
* Throws an {@link StorageRebalanceException} if the storage is in the process of rebalancing.
*
@@ -61,6 +104,8 @@ public class StorageUtils {
throw new StorageRebalanceException(createStorageClosedErrorMessage(storageInfo));
case REBALANCE:
throw new StorageRebalanceException(createStorageInProcessOfRebalanceErrorMessage(storageInfo));
+ case CLEANUP:
+ throw new StorageRebalanceException(createStorageInProcessOfCleanupErrorMessage(storageInfo));
default:
throw new StorageRebalanceException(createUnexpectedStorageStateErrorMessage(state, storageInfo));
}
@@ -81,6 +126,8 @@ public class StorageUtils {
throw new StorageClosedException(createStorageClosedErrorMessage(storageInfo));
case REBALANCE:
throw new StorageRebalanceException(createStorageInProcessOfRebalanceErrorMessage(storageInfo));
+ case CLEANUP:
+ throw new StorageException(createStorageInProcessOfCleanupErrorMessage(storageInfo));
default:
throw new StorageException(createUnexpectedStorageStateErrorMessage(state, storageInfo));
}
@@ -109,4 +156,8 @@ public class StorageUtils {
private static String createStorageClosedErrorMessage(String storageInfo) {
return IgniteStringFormatter.format("Storage is already closed: [{}]", storageInfo);
}
+
+ private static String createStorageInProcessOfCleanupErrorMessage(String storageInfo) {
+ return IgniteStringFormatter.format("Storage is in the process of cleanup: [{}]", storageInfo);
+ }
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 3a70adbfef..d88d271621 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -39,7 +39,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assumptions.assumeFalse;
import static org.mockito.Mockito.mock;
import java.nio.ByteBuffer;
@@ -593,19 +592,15 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
assertThat(tableStorage.destroy(), willCompleteSuccessfully());
}
- /**
- * Checks that if we restart the storages after a crash in the middle of a rebalance, the storages will be empty.
- */
@Test
- public void testRestartStoragesAfterFailDuringRebalance() {
- assumeFalse(tableStorage.isVolatile());
-
+ public void testRestartStoragesInTheMiddleOfRebalance() {
MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
List<IgniteTuple3<RowId, TableRow, HybridTimestamp>> rows = List.of(
- new IgniteTuple3<>(new RowId(PARTITION_ID), tableRow(new TestKey(0, "0"), new TestValue(0, "0")), clock.now())
+ new IgniteTuple3<>(new RowId(PARTITION_ID), tableRow(new TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
+ new IgniteTuple3<>(new RowId(PARTITION_ID), tableRow(new TestKey(1, "1"), new TestValue(1, "1")), clock.now())
);
fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
@@ -632,10 +627,76 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
- // Let's check the repositories: they should be empty.
- checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
+ if (tableStorage.isVolatile()) {
+ // Let's check the repositories: they should be empty.
+ checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
+
+ checkLastApplied(mvPartitionStorage, 0, 0, 0);
+ } else {
+ checkForPresenceRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
+
+ checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+ }
+ }
+
+ @Test
+ void testClear() {
+ assertThrows(IllegalArgumentException.class, () -> tableStorage.clearPartition(getPartitionIdOutOfRange()));
+
+ // Let's check that there will be an error for a non-existent partition.
+ assertThrows(StorageException.class, () -> tableStorage.clearPartition(PARTITION_ID));
+
+ MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+ HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+ SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+ // Let's check the cleanup for an empty partition.
+ assertThat(tableStorage.clearPartition(PARTITION_ID), willCompleteSuccessfully());
+
+ checkLastApplied(mvPartitionStorage, 0, 0, 0);
+ assertNull(mvPartitionStorage.committedGroupConfiguration());
+
+ // Let's fill the storages and clean them.
+ List<IgniteTuple3<RowId, TableRow, HybridTimestamp>> rows = List.of(
+ new IgniteTuple3<>(new RowId(PARTITION_ID), tableRow(new TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
+ new IgniteTuple3<>(new RowId(PARTITION_ID), tableRow(new TestKey(1, "1"), new TestValue(1, "1")), clock.now())
+ );
+
+ RaftGroupConfiguration raftGroupConfig = createRandomRaftGroupConfiguration();
+
+ fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
+
+ mvPartitionStorage.runConsistently(() -> {
+ mvPartitionStorage.lastApplied(100, 500);
+
+ mvPartitionStorage.committedGroupConfiguration(raftGroupConfig);
+
+ return null;
+ });
+
+ // Let's clear the storages and check them out.
+ assertThat(tableStorage.clearPartition(PARTITION_ID), willCompleteSuccessfully());
checkLastApplied(mvPartitionStorage, 0, 0, 0);
+ assertNull(mvPartitionStorage.committedGroupConfiguration());
+
+ checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
+ }
+
+ @Test
+ void testClearForCanceledOrRebalancedPartition() {
+ MvPartitionStorage mvPartitionStorage0 = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+ tableStorage.getOrCreateMvPartition(PARTITION_ID + 1);
+
+ mvPartitionStorage0.close();
+ assertThat(tableStorage.startRebalancePartition(PARTITION_ID + 1), willCompleteSuccessfully());
+
+ try {
+ assertThrows(StorageClosedException.class, () -> tableStorage.clearPartition(PARTITION_ID));
+ assertThrows(StorageRebalanceException.class, () -> tableStorage.clearPartition(PARTITION_ID + 1));
+ } finally {
+ assertThat(tableStorage.abortRebalancePartition(PARTITION_ID + 1), willCompleteSuccessfully());
+ }
}
private static void createTestIndexes(TablesConfiguration tablesConfig) {
@@ -742,8 +803,6 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
SortedIndexStorage sortedIndexStorage,
List<IgniteTuple3<RowId, TableRow, HybridTimestamp>> rowsBeforeRebalanceStart
) {
- assertThat(rowsBeforeRebalanceStart, hasSize(greaterThanOrEqualTo(2)));
-
fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsBeforeRebalanceStart);
// Let's open the cursors before start rebalance.
@@ -852,6 +911,8 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
SortedIndexStorage sortedIndexStorage,
List<IgniteTuple3<RowId, TableRow, HybridTimestamp>> rows
) {
+ assertThat(rows, hasSize(greaterThanOrEqualTo(2)));
+
for (int i = 0; i < rows.size(); i++) {
int finalI = i;
@@ -865,8 +926,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), tableRow, rowId);
mvPartitionStorage.runConsistently(() -> {
- // If even.
- if ((finalI & 1) == 0) {
+ if ((finalI % 2) == 0) {
mvPartitionStorage.addWrite(rowId, tableRow, UUID.randomUUID(), UUID.randomUUID(), rowId.partitionId());
mvPartitionStorage.commitWrite(rowId, timestamp);
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 814e230c69..17e525fe4e 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -540,7 +540,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
closed = true;
- clear();
+ clear0();
}
public void destroy() {
@@ -549,9 +549,19 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
/** Removes all entries from this storage. */
public synchronized void clear() {
+ checkStorageClosedOrInProcessOfRebalance();
+
+ clear0();
+ }
+
+ private synchronized void clear0() {
map.clear();
gcQueue.clear();
+
+ lastAppliedIndex = 0;
+ lastAppliedTerm = 0;
+ groupConfig = null;
}
private void checkStorageClosed() {
@@ -576,7 +586,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
rebalance = true;
- clear();
+ clear0();
lastAppliedIndex = REBALANCE_IN_PROGRESS;
lastAppliedTerm = REBALANCE_IN_PROGRESS;
@@ -593,7 +603,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
rebalance = false;
- clear();
+ clear0();
lastAppliedIndex = 0;
lastAppliedTerm = 0;
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index c2d196aff2..33829de4ac 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -339,6 +339,24 @@ public class TestMvTableStorage implements MvTableStorage {
});
}
+ @Override
+ public CompletableFuture<Void> clearPartition(int partitionId) {
+ checkPartitionId(partitionId);
+
+ TestMvPartitionStorage mvPartitionStorage = partitions.get(partitionId);
+
+ if (mvPartitionStorage == null) {
+ throw new StorageException(createMissingMvPartitionErrorMessage(partitionId));
+ }
+
+ mvPartitionStorage.clear();
+
+ testHashIndexStorageStream(partitionId).forEach(TestHashIndexStorage::clear);
+ testSortedIndexStorageStream(partitionId).forEach(TestSortedIndexStorage::clear);
+
+ return completedFuture(null);
+ }
+
private void checkPartitionId(int partitionId) {
Integer partitions = tableCfg.partitions().value();
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
index d66773b94e..9dbdf16928 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
@@ -131,13 +131,19 @@ public class TestHashIndexStorage implements HashIndexStorage {
public void destroy() {
closed = true;
- clear();
+ clear0();
}
/**
* Removes all index data.
*/
public void clear() {
+ checkStorageClosedOrInProcessOfRebalance();
+
+ clear0();
+ }
+
+ private void clear0() {
index.clear();
}
@@ -163,7 +169,7 @@ public class TestHashIndexStorage implements HashIndexStorage {
rebalance = true;
- clear();
+ clear0();
}
/**
@@ -178,7 +184,7 @@ public class TestHashIndexStorage implements HashIndexStorage {
rebalance = false;
- clear();
+ clear0();
}
/**
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index ba93ff06d1..9d15eabdc2 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -178,13 +178,19 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
public void destroy() {
closed = true;
- clear();
+ clear0();
}
/**
* Removes all index data.
*/
public void clear() {
+ checkStorageClosedOrInProcessOfRebalance();
+
+ index.clear();
+ }
+
+ private void clear0() {
index.clear();
}
@@ -337,7 +343,7 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
rebalance = true;
- clear();
+ clear0();
}
/**
@@ -352,7 +358,7 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
rebalance = false;
- clear();
+ clear0();
}
/**
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 6e6b2bf4e5..d2ee618127 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -412,6 +412,35 @@ public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
});
}
+ @Override
+ public CompletableFuture<Void> clearPartition(int partitionId) {
+ return inBusyLock(busyLock, () -> {
+ AbstractPageMemoryMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+ if (mvPartitionStorage == null) {
+ throw new StorageException(createMissingMvPartitionErrorMessage(partitionId));
+ }
+
+ try {
+ mvPartitionStorage.startCleanup();
+
+ return clearStorageAndUpdateDataStructures(mvPartitionStorage)
+ .whenComplete((unused, throwable) -> mvPartitionStorage.finishCleanup());
+ } catch (StorageException e) {
+ mvPartitionStorage.finishCleanup();
+
+ throw e;
+ } catch (Throwable t) {
+ mvPartitionStorage.finishCleanup();
+
+ throw new StorageException(
+ IgniteStringFormatter.format("Failed to cleanup storage: [{}]", mvPartitionStorage.createStorageInfo()),
+ t
+ );
+ }
+ });
+ }
+
/**
* Clears the partition multi-version storage and all its indexes, updates their internal data structures such as {@link BplusTree},
* {@link FreeList} and {@link ReuseList}.
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 34b3f50f35..5dbeb231df 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.storage.pagememory;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
-import static org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -395,10 +394,7 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto
int partitionId = groupPartitionId.getPartitionId();
- PartitionMeta meta = getOrCreatePartitionMeta(
- groupPartitionId,
- ensurePartitionFilePageStoreExists(tableView, groupPartitionId)
- );
+ PartitionMeta meta = getOrCreatePartitionMetaOnCreatePartition(groupPartitionId);
inCheckpointLock(() -> {
RowVersionFreeList rowVersionFreeList = createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
@@ -410,7 +406,7 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto
IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta);
- ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructuresOnRebalance(
+ ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructures(
meta,
rowVersionFreeList,
indexColumnsFreeList,
@@ -485,7 +481,7 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto
}
/**
- * Creates or receives partition meta from a file, if the rebalancing has not finished in time, the file page store will be recreated.
+ * Creates or receives partition meta from a file.
*
* <p>Safe to use without a checkpointReadLock as we read the meta directly without using {@link PageMemory}.
*
@@ -496,27 +492,7 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto
FilePageStore filePageStore = ensurePartitionFilePageStoreExists(tableView, groupPartitionId);
- PartitionMeta partitionMeta = getOrCreatePartitionMeta(groupPartitionId, filePageStore);
-
- if (partitionMeta.lastAppliedIndex() == REBALANCE_IN_PROGRESS) {
- try {
- // TODO: IGNITE-18565 We need to return a CompletableFuture
- destroyPartitionPhysically(groupPartitionId).get(10, TimeUnit.SECONDS);
- } catch (Exception e) {
- throw new StorageException(
- IgniteStringFormatter.format(
- "Error when physically destroying a partition: [table={}, partitionId={}]",
- getTableName(),
- groupPartitionId.getPartitionId()
- ),
- e
- );
- }
-
- return getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStoreExists(tableView, groupPartitionId));
- } else {
- return partitionMeta;
- }
+ return getOrCreatePartitionMeta(groupPartitionId, filePageStore);
}
private void waitPartitionToBeDestroyed(int partitionId) {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index 66aeb50a45..584b54e29a 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -154,7 +154,7 @@ public class VolatilePageMemoryTableStorage extends AbstractPageMemoryTableStora
int partitionId = mvPartitionStorage.partitionId();
TableView tableView = tableCfg.value();
- volatilePartitionStorage.updateDataStructuresOnRebalance(
+ volatilePartitionStorage.updateDataStructures(
createVersionChainTree(partitionId, tableView),
createIndexMetaTree(partitionId, tableView)
);
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
index 21fd68173f..7ad2203078 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.storage.pagememory.index.hash;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
-import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInProgressOfRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -286,14 +286,14 @@ public class PageMemoryHashIndexStorage implements HashIndexStorage {
}
/**
- * Updates the internal data structures of the storage on rebalance.
+ * Updates the internal data structures of the storage on rebalance or cleanup.
*
* @param freeList Free list to store index columns.
* @param hashIndexTree Hash index tree instance.
- * @throws StorageRebalanceException If the storage is not in the process of rebalancing.
+ * @throws StorageException If failed.
*/
- public void updateDataStructuresOnRebalance(IndexColumnsFreeList freeList, HashIndexTree hashIndexTree) {
- throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+ public void updateDataStructures(IndexColumnsFreeList freeList, HashIndexTree hashIndexTree) {
+ throwExceptionIfStorageNotInCleanupOrRebalancedState(state.get(), this::createStorageInfo);
this.freeList = freeList;
this.hashIndexTree = hashIndexTree;
@@ -302,4 +302,31 @@ public class PageMemoryHashIndexStorage implements HashIndexStorage {
private String createStorageInfo() {
return IgniteStringFormatter.format("indexId={}, partitionId={}", descriptor.id(), partitionId);
}
+
+ /**
+ * Prepares the storage for cleanup.
+ *
+ * <p>After cleanup (successful or not), method {@link #finishCleanup()} must be called.
+ */
+ public void startCleanup() {
+ if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
+ throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
+ }
+
+ // Changed storage states and expect all storage operations to stop soon.
+ busyLock.block();
+
+ try {
+ hashIndexTree.close();
+ } finally {
+ busyLock.unblock();
+ }
+ }
+
+ /**
+ * Finishes cleanup up the storage.
+ */
+ public void finishCleanup() {
+ state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
+ }
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
index 79cde00a2e..111a1f6f78 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.storage.pagememory.index.sorted;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
-import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInProgressOfRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
import java.nio.ByteBuffer;
import java.util.NoSuchElementException;
@@ -422,14 +422,14 @@ public class PageMemorySortedIndexStorage implements SortedIndexStorage {
}
/**
- * Updates the internal data structures of the storage on rebalance.
+ * Updates the internal data structures of the storage on rebalance or cleanup.
*
* @param freeList Free list to store index columns.
* @param sortedIndexTree Sorted index tree instance.
- * @throws StorageRebalanceException If the storage is not in the process of rebalancing.
+ * @throws StorageException If failed.
*/
- public void updateDataStructuresOnRebalance(IndexColumnsFreeList freeList, SortedIndexTree sortedIndexTree) {
- throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+ public void updateDataStructures(IndexColumnsFreeList freeList, SortedIndexTree sortedIndexTree) {
+ throwExceptionIfStorageNotInCleanupOrRebalancedState(state.get(), this::createStorageInfo);
this.freeList = freeList;
this.sortedIndexTree = sortedIndexTree;
@@ -470,4 +470,31 @@ public class PageMemorySortedIndexStorage implements SortedIndexStorage {
indexRow.indexColumns().link(PageIdUtils.NULL_LINK);
}
}
+
+ /**
+ * Prepares the storage for cleanup.
+ *
+ * <p>After cleanup (successful or not), method {@link #finishCleanup()} must be called.
+ */
+ public void startCleanup() {
+ if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
+ throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
+ }
+
+ // Changed storage states and expect all storage operations to stop soon.
+ busyLock.block();
+
+ try {
+ sortedIndexTree.close();
+ } finally {
+ busyLock.unblock();
+ }
+ }
+
+ /**
+ * Finishes cleanup up the storage.
+ */
+ public void finishCleanup() {
+ state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
+ }
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 323d368648..d90c6be71f 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -21,7 +21,8 @@ import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.ge
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
-import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableState;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -146,6 +147,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
*/
public void start() {
busy(() -> {
+ throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
+
try (Cursor<IndexMeta> cursor = indexMetaTree.find(null, null)) {
NamedListView<TableIndexView> indexesCfgView = tableStorage.tablesConfiguration().indexes().value();
@@ -165,7 +168,10 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
return null;
} catch (Exception e) {
- throw new StorageException("Failed to process SQL indexes during the partition start", e);
+ throw new StorageException(
+ IgniteStringFormatter.format("Failed to process SQL indexes during partition start: [{}]", createStorageInfo()),
+ e
+ );
}
});
}
@@ -196,6 +202,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
private PageMemoryHashIndexStorage createOrRestoreHashIndex(IndexMeta indexMeta) {
+ throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
+
var indexDescriptor = new HashIndexDescriptor(indexMeta.id(), tableStorage.tablesConfiguration().value());
HashIndexTree hashIndexTree = createHashIndexTree(indexDescriptor, indexMeta);
@@ -236,9 +244,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
} catch (IgniteInternalCheckedException e) {
throw new StorageException(
IgniteStringFormatter.format(
- "Error creating hash index tree: [table={}, partitionId={}, indexId={}]",
- tableStorage.getTableName(),
- partitionId,
+ "Error creating hash index tree: [{}, indexId={}]",
+ createStorageInfo(),
indexMeta.id()
),
e
@@ -247,6 +254,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
private PageMemorySortedIndexStorage createOrRestoreSortedIndex(IndexMeta indexMeta) {
+ throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
+
var indexDescriptor = new SortedIndexDescriptor(indexMeta.id(), tableStorage.tablesConfiguration().value());
SortedIndexTree sortedIndexTree = createSortedIndexTree(indexDescriptor, indexMeta);
@@ -300,7 +309,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
@Override
public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
return busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
if (rowId.partitionId() != partitionId) {
throw new IllegalArgumentException(
@@ -517,6 +526,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
assert rowId.partitionId() == partitionId : rowId;
return busy(() -> {
+ throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
VersionChain currentChain = findVersionChain(rowId);
if (currentChain == null) {
@@ -561,7 +572,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
assert rowId.partitionId() == partitionId : rowId;
return busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
VersionChain currentVersionChain = findVersionChain(rowId);
@@ -605,6 +616,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
assert rowId.partitionId() == partitionId : rowId;
busy(() -> {
+ throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
VersionChain currentVersionChain = findVersionChain(rowId);
if (currentVersionChain == null || currentVersionChain.transactionId() == null) {
@@ -657,6 +670,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
assert rowId.partitionId() == partitionId : rowId;
busy(() -> {
+ throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
VersionChain currentChain = findVersionChain(rowId);
if (currentChain != null && currentChain.isUncommitted()) {
@@ -689,7 +704,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
@Override
public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
return busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
return new ScanVersionsCursor(rowId);
});
@@ -715,7 +730,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
@Override
public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException {
return busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
Cursor<VersionChain> treeCursor;
@@ -736,7 +751,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
@Override
public @Nullable RowId closestRowId(RowId lowerBound) throws StorageException {
return busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
try (Cursor<VersionChain> cursor = versionChainTree.find(new VersionChainKey(lowerBound), null)) {
return cursor.hasNext() ? cursor.next().rowId() : null;
@@ -749,7 +764,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
@Override
public long rowsCount() {
return busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
try {
return versionChainTree.size();
@@ -775,7 +790,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
@Override
public final ReadResult next() {
return busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
if (!hasNext()) {
throw new NoSuchElementException("The cursor is exhausted");
@@ -799,7 +814,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
@Override
public @Nullable TableRow committed(HybridTimestamp timestamp) {
return busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
if (currentChain == null) {
throw new IllegalStateException();
@@ -835,7 +850,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
@Override
public boolean hasNext() {
return busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
if (nextRead != null) {
return true;
@@ -848,8 +863,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
currentChain = null;
while (true) {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
-
if (!treeCursor.hasNext()) {
iterationExhausted = true;
@@ -887,6 +900,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
@Override
public boolean hasNext() {
return busy(() -> {
+ throwExceptionIfStorageNotInRunnableState(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
+
if (nextRead != null) {
return true;
}
@@ -963,7 +978,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
private void advanceIfNeeded() {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState(state.get(), AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
if (hasNext != null) {
return;
@@ -1006,7 +1021,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
StorageState state = this.state.get();
- assert state == StorageState.CLOSED : state;
+ assert state == StorageState.CLOSED : IgniteStringFormatter.format("{}, state={}", createStorageInfo(), state);
return;
}
@@ -1083,7 +1098,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
busyLock.block();
try {
- IgniteUtils.closeAll(getResourcesToCloseOnRebalance());
+ IgniteUtils.closeAll(getResourcesToCloseOnCleanup());
hashIndexes.values().forEach(PageMemoryHashIndexStorage::startRebalance);
sortedIndexes.values().forEach(PageMemorySortedIndexStorage::startRebalance);
@@ -1120,12 +1135,45 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
public abstract void lastAppliedOnRebalance(long lastAppliedIndex, long lastAppliedTerm) throws StorageException;
/**
- * Returns resources that will have to close on rebalancing.
+ * Returns resources that will have to close on cleanup.
*/
- abstract List<AutoCloseable> getResourcesToCloseOnRebalance();
+ abstract List<AutoCloseable> getResourcesToCloseOnCleanup();
/**
* Sets the RAFT group configuration on rebalance.
*/
public abstract void committedGroupConfigurationOnRebalance(RaftGroupConfiguration config);
+
+ /**
+ * Prepares the storage and its indexes for cleanup.
+ *
+ * <p>After cleanup (successful or not), method {@link #finishCleanup()} must be called.
+ */
+ public void startCleanup() throws Exception {
+ if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
+ throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
+ }
+
+ // Changed storage states and expect all storage operations to stop soon.
+ busyLock.block();
+
+ try {
+ IgniteUtils.closeAll(getResourcesToCloseOnCleanup());
+
+ hashIndexes.values().forEach(PageMemoryHashIndexStorage::startCleanup);
+ sortedIndexes.values().forEach(PageMemorySortedIndexStorage::startCleanup);
+ } finally {
+ busyLock.unblock();
+ }
+ }
+
+ /**
+ * Finishes cleanup up the storage and its indexes.
+ */
+ public void finishCleanup() {
+ if (state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE)) {
+ hashIndexes.values().forEach(PageMemoryHashIndexStorage::finishCleanup);
+ sortedIndexes.values().forEach(PageMemorySortedIndexStorage::finishCleanup);
+ }
+ }
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 2c3c54d9a5..2ef58c5553 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.storage.pagememory.mv;
-import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInProgressOfRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableState;
import java.util.List;
import java.util.UUID;
@@ -39,7 +41,6 @@ import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryTableStorage;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineView;
import org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumns;
@@ -103,6 +104,9 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
DataRegion<PersistentPageMemory> dataRegion = tableStorage.dataRegion();
+ this.meta = meta;
+ persistedIndex = meta.lastAppliedIndex();
+
checkpointManager.addCheckpointListener(checkpointListener = new CheckpointListener() {
@Override
public void beforeCheckpointBegin(CheckpointProgress progress, @Nullable Executor exec) throws IgniteInternalCheckedException {
@@ -122,8 +126,6 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
}
}, dataRegion);
- this.meta = meta;
-
blobStorage = new BlobStorage(
rowVersionFreeList,
dataRegion.pageMemory(),
@@ -136,6 +138,8 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
@Override
public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
return busy(() -> {
+ throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
checkpointTimeoutLock.checkpointReadLock();
try {
@@ -149,6 +153,8 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
@Override
public CompletableFuture<Void> flush() {
return busy(() -> {
+ throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
CheckpointProgress lastCheckpoint = checkpointManager.lastCheckpointProgress();
CheckpointProgress scheduledCheckpoint;
@@ -170,18 +176,26 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
@Override
public long lastAppliedIndex() {
- return busy(meta::lastAppliedIndex);
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+ return meta.lastAppliedIndex();
+ });
}
@Override
public long lastAppliedTerm() {
- return busy(meta::lastAppliedTerm);
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+ return meta.lastAppliedTerm();
+ });
}
@Override
public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws StorageException {
busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
lastAppliedBusy(lastAppliedIndex, lastAppliedTerm);
@@ -201,13 +215,19 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
@Override
public long persistedIndex() {
- return busy(() -> persistedIndex);
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+ return persistedIndex;
+ });
}
@Override
@Nullable
public RaftGroupConfiguration committedGroupConfiguration() {
return busy(() -> {
+ throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
try {
replicationProtocolGroupConfigReadWriteLock.readLock().lock();
@@ -233,7 +253,7 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
@Override
public void committedGroupConfiguration(RaftGroupConfiguration config) {
busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
committedGroupConfigurationBusy(config);
@@ -345,23 +365,23 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
}
/**
- * Updates the internal data structures of the storage and its indexes on rebalance.
+ * Updates the internal data structures of the storage and its indexes on reblance or cleanup.
*
* @param meta Partition meta.
* @param rowVersionFreeList Free list for {@link RowVersion}.
* @param indexFreeList Free list fot {@link IndexColumns}.
* @param versionChainTree Table tree for {@link VersionChain}.
* @param indexMetaTree Tree that contains SQL indexes' metadata.
- * @throws StorageRebalanceException If the storage is not in the process of rebalancing.
+ * @throws StorageException If failed.
*/
- public void updateDataStructuresOnRebalance(
+ public void updateDataStructures(
PartitionMeta meta,
RowVersionFreeList rowVersionFreeList,
IndexColumnsFreeList indexFreeList,
VersionChainTree versionChainTree,
IndexMetaTree indexMetaTree
) {
- throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfStorageNotInCleanupOrRebalancedState(state.get(), this::createStorageInfo);
this.meta = meta;
@@ -379,14 +399,14 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
);
for (PageMemoryHashIndexStorage indexStorage : hashIndexes.values()) {
- indexStorage.updateDataStructuresOnRebalance(
+ indexStorage.updateDataStructures(
indexFreeList,
createHashIndexTree(indexStorage.indexDescriptor(), new IndexMeta(indexStorage.indexDescriptor().id(), 0L))
);
}
for (PageMemorySortedIndexStorage indexStorage : sortedIndexes.values()) {
- indexStorage.updateDataStructuresOnRebalance(
+ indexStorage.updateDataStructures(
indexFreeList,
createSortedIndexTree(indexStorage.indexDescriptor(), new IndexMeta(indexStorage.indexDescriptor().id(), 0L))
);
@@ -394,7 +414,7 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
}
@Override
- List<AutoCloseable> getResourcesToCloseOnRebalance() {
+ List<AutoCloseable> getResourcesToCloseOnCleanup() {
return List.of(
rowVersionFreeList::close,
indexFreeList::close,
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index 362a3c3529..cc5348e7f9 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -18,8 +18,10 @@
package org.apache.ignite.internal.storage.pagememory.mv;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInProgressOfRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableState;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -33,7 +35,6 @@ import org.apache.ignite.internal.pagememory.util.PageIdUtils;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage;
import org.apache.ignite.internal.storage.pagememory.index.hash.PageMemoryHashIndexStorage;
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta;
@@ -93,28 +94,44 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
@Override
public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
- return busy(closure::execute);
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+ return closure.execute();
+ });
}
@Override
public CompletableFuture<Void> flush() {
- return busy(() -> completedFuture(null));
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+ return completedFuture(null);
+ });
}
@Override
public long lastAppliedIndex() {
- return busy(() -> lastAppliedIndex);
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+ return lastAppliedIndex;
+ });
}
@Override
public long lastAppliedTerm() {
- return busy(() -> lastAppliedTerm);
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+ return lastAppliedTerm;
+ });
}
@Override
public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws StorageException {
busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
this.lastAppliedIndex = lastAppliedIndex;
this.lastAppliedTerm = lastAppliedTerm;
@@ -125,18 +142,26 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
@Override
public long persistedIndex() {
- return busy(() -> lastAppliedIndex);
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+ return lastAppliedIndex;
+ });
}
@Override
public @Nullable RaftGroupConfiguration committedGroupConfiguration() {
- return busy(() -> groupConfig);
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+
+ return groupConfig;
+ });
}
@Override
public void committedGroupConfiguration(RaftGroupConfiguration config) {
busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfStorageNotInRunnableState(state.get(), this::createStorageInfo);
groupConfig = config;
@@ -194,6 +219,10 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
hashIndexes.values().forEach(indexStorage -> indexStorage.startDestructionOn(destructionExecutor));
sortedIndexes.values().forEach(indexStorage -> indexStorage.startDestructionOn(destructionExecutor));
+ lastAppliedIndex = 0;
+ lastAppliedTerm = 0;
+ groupConfig = null;
+
if (removeIndexDescriptors) {
hashIndexes.clear();
sortedIndexes.clear();
@@ -249,35 +278,35 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
}
@Override
- List<AutoCloseable> getResourcesToCloseOnRebalance() {
+ List<AutoCloseable> getResourcesToCloseOnCleanup() {
return List.of(versionChainTree::close, indexMetaTree::close);
}
/**
- * Updates the internal data structures of the storage and its indexes on rebalance.
+ * Updates the internal data structures of the storage and its indexes on rebalance or cleanup.
*
* @param versionChainTree Table tree for {@link VersionChain}.
* @param indexMetaTree Tree that contains SQL indexes' metadata.
- * @throws StorageRebalanceException If the storage is not in the process of rebalancing.
+ * @throws StorageException If failed.
*/
- public void updateDataStructuresOnRebalance(
+ public void updateDataStructures(
VersionChainTree versionChainTree,
IndexMetaTree indexMetaTree
) {
- throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfStorageNotInCleanupOrRebalancedState(state.get(), this::createStorageInfo);
this.versionChainTree = versionChainTree;
this.indexMetaTree = indexMetaTree;
for (PageMemoryHashIndexStorage indexStorage : hashIndexes.values()) {
- indexStorage.updateDataStructuresOnRebalance(
+ indexStorage.updateDataStructures(
indexFreeList,
createHashIndexTree(indexStorage.indexDescriptor(), new IndexMeta(indexStorage.indexDescriptor().id(), 0L))
);
}
for (PageMemorySortedIndexStorage indexStorage : sortedIndexes.values()) {
- indexStorage.updateDataStructuresOnRebalance(
+ indexStorage.updateDataStructures(
indexFreeList,
createSortedIndexTree(indexStorage.indexDescriptor(), new IndexMeta(indexStorage.indexDescriptor().id(), 0L))
);
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 4c408fc3e2..4507794523 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -323,7 +323,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
try {
- saveLastApplied(requireWriteBatch(), lastAppliedIndex, lastAppliedTerm);
+ savePendingLastApplied(requireWriteBatch(), lastAppliedIndex, lastAppliedTerm);
return null;
} catch (RocksDBException e) {
@@ -332,7 +332,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
});
}
- private void saveLastApplied(AbstractWriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+ private void savePendingLastApplied(
+ AbstractWriteBatch writeBatch,
+ long lastAppliedIndex,
+ long lastAppliedTerm
+ ) throws RocksDBException {
writeBatch.put(meta, lastAppliedIndexKey, longToBytes(lastAppliedIndex));
writeBatch.put(meta, lastAppliedTermKey, longToBytes(lastAppliedTerm));
@@ -1477,7 +1481,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
busyLock.block();
try {
- clearStorageOnRebalance(writeBatch, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+ clearStorage(writeBatch, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
} catch (RocksDBException e) {
throw new StorageRebalanceException("Error when trying to start rebalancing storage: " + createStorageInfo(), e);
} finally {
@@ -1496,7 +1500,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
}
try {
- clearStorageOnRebalance(writeBatch, 0, 0);
+ clearStorage(writeBatch, 0, 0);
} catch (RocksDBException e) {
throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
}
@@ -1513,7 +1517,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
}
try {
- saveLastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+ saveLastApplied(writeBatch, lastAppliedIndex, lastAppliedTerm);
saveRaftGroupConfigurationOnRebalance(writeBatch, raftGroupConfig);
} catch (RocksDBException e) {
@@ -1521,16 +1525,19 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
}
}
- private void clearStorageOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
- saveLastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+ private void clearStorage(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+ saveLastApplied(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+ pendingGroupConfig = null;
+ lastGroupConfig = null;
writeBatch.delete(meta, lastGroupConfigKey);
writeBatch.delete(meta, partitionIdKey(partitionId));
writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
}
- private void saveLastAppliedOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
- saveLastApplied(writeBatch, lastAppliedIndex, lastAppliedTerm);
+ private void saveLastApplied(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+ savePendingLastApplied(writeBatch, lastAppliedIndex, lastAppliedTerm);
this.lastAppliedIndex = lastAppliedIndex;
this.lastAppliedTerm = lastAppliedTerm;
@@ -1543,4 +1550,29 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
this.lastGroupConfig = config;
}
+
+ /**
+ * Prepares the storage for cleanup.
+ *
+ * <p>After cleanup (successful or not), method {@link #finishCleanup()} must be called.
+ */
+ void startCleanup(WriteBatch writeBatch) throws RocksDBException {
+ if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
+ throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
+ }
+
+ // Changed storage states and expect all storage operations to stop soon.
+ busyLock.block();
+
+ clearStorage(writeBatch, 0, 0);
+ }
+
+ /**
+ * Finishes cleanup up the storage.
+ */
+ void finishCleanup() {
+ if (state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE)) {
+ busyLock.unblock();
+ }
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 2256c930b4..f6ebf8c7e5 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -670,7 +670,10 @@ public class RocksDbTableStorage implements MvTableStorage {
return completedFuture(null);
} catch (RocksDBException e) {
throw new StorageRebalanceException(
- "Error when trying to start rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+ IgniteStringFormatter.format(
+ "Error when trying to start rebalancing storage: [{}]",
+ mvPartitionStorage.createStorageInfo()
+ ),
e
);
}
@@ -703,7 +706,10 @@ public class RocksDbTableStorage implements MvTableStorage {
return completedFuture(null);
} catch (RocksDBException e) {
throw new StorageRebalanceException(
- "Error when trying to abort rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+ IgniteStringFormatter.format(
+ "Error when trying to abort rebalancing storage: [{}]",
+ mvPartitionStorage.createStorageInfo()
+ ),
e
);
}
@@ -741,13 +747,59 @@ public class RocksDbTableStorage implements MvTableStorage {
return completedFuture(null);
} catch (RocksDBException e) {
throw new StorageRebalanceException(
- "Error when trying to finish rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+ IgniteStringFormatter.format(
+ "Error when trying to finish rebalancing storage: [{}]",
+ mvPartitionStorage.createStorageInfo()
+ ),
e
);
}
});
}
+ @Override
+ public CompletableFuture<Void> clearPartition(int partitionId) {
+ return inBusyLock(busyLock, () -> {
+ RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+ if (mvPartitionStorage == null) {
+ throw new StorageException(createMissingMvPartitionErrorMessage(partitionId));
+ }
+
+ List<RocksDbHashIndexStorage> hashIndexStorages = getHashIndexStorages(partitionId);
+ List<RocksDbSortedIndexStorage> sortedIndexStorages = getSortedIndexStorages(partitionId);
+
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ mvPartitionStorage.startCleanup(writeBatch);
+
+ for (RocksDbHashIndexStorage hashIndexStorage : hashIndexStorages) {
+ hashIndexStorage.startCleanup(writeBatch);
+ }
+
+ for (RocksDbSortedIndexStorage sortedIndexStorage : sortedIndexStorages) {
+ sortedIndexStorage.startCleanup(writeBatch);
+ }
+
+ db.write(writeOptions, writeBatch);
+ } catch (RocksDBException e) {
+ throw new StorageException(
+ IgniteStringFormatter.format(
+ "Error when trying to cleanup storage: [{}]",
+ mvPartitionStorage.createStorageInfo()
+ ),
+ e
+ );
+ } finally {
+ mvPartitionStorage.finishCleanup();
+
+ hashIndexStorages.forEach(RocksDbHashIndexStorage::finishCleanup);
+ sortedIndexStorages.forEach(RocksDbSortedIndexStorage::finishCleanup);
+ }
+
+ return completedFuture(null);
+ });
+ }
+
/**
* Returns table name.
*/
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
index 4e60f1e259..255f96ae02 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
@@ -344,4 +344,29 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
}
}
+
+ /**
+ * Prepares the storage for cleanup.
+ *
+ * <p>After cleanup (successful or not), method {@link #finishCleanup()} must be called.
+ */
+ public void startCleanup(WriteBatch writeBatch) throws RocksDBException {
+ if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
+ throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
+ }
+
+ // Changed storage states and expect all storage operations to stop soon.
+ busyLock.block();
+
+ destroyData(writeBatch);
+ }
+
+ /**
+ * Finishes cleanup up the storage.
+ */
+ public void finishCleanup() {
+ if (state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE)) {
+ busyLock.unblock();
+ }
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index c22583f76a..bb7e923122 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -469,4 +469,29 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
}
}
+
+ /**
+ * Prepares the storage for cleanup.
+ *
+ * <p>After cleanup (successful or not), method {@link #finishCleanup()} must be called.
+ */
+ public void startCleanup(WriteBatch writeBatch) throws RocksDBException {
+ if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
+ throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
+ }
+
+ // Changed storage states and expect all storage operations to stop soon.
+ busyLock.block();
+
+ destroyData(writeBatch);
+ }
+
+ /**
+ * Finishes cleanup up the storage.
+ */
+ public void finishCleanup() {
+ if (state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE)) {
+ busyLock.unblock();
+ }
+ }
}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 581130a468..7bb22da652 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -132,11 +132,4 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
public void testDestroyTableStorage() throws Exception {
super.testDestroyTableStorage();
}
-
- @Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
- @Override
- public void testRestartStoragesAfterFailDuringRebalance() {
- super.testRestartStoragesAfterFailDuringRebalance();
- }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 7a379d8b3c..c2296a8e2a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -752,7 +752,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return partitionDataStorageFut
.thenCompose(s -> storageUpdateHandlerFut)
- .thenCompose(s -> getOrCreateTxStateStorageAsync(internalTbl.txStateStorage(), partId))
+ .thenCompose(s -> getOrCreateTxStateStorage(internalTbl.txStateStorage(), partId))
.thenAcceptAsync(txStatePartitionStorage -> {
RaftGroupOptions groupOptions = groupOptionsForPartition(
internalTbl.storage(),
@@ -818,7 +818,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
CompletableFuture<TxStateStorage> txStateStorageFuture =
- getOrCreateTxStateStorageAsync(internalTbl.txStateStorage(), partId);
+ getOrCreateTxStateStorage(internalTbl.txStateStorage(), partId);
StorageUpdateHandler storageUpdateHandler = storageUpdateHandlerFut.join();
@@ -1847,7 +1847,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
StorageUpdateHandler storageUpdateHandler =
new StorageUpdateHandler(partId, partitionDataStorage, tbl.indexStorageAdapters(partId));
- TxStateStorage txStatePartitionStorage = getOrCreateTxStateStorage(internalTable.txStateStorage(), partId);
+ TxStateStorage txStatePartitionStorage = getOrCreateTxStateStorage(internalTable.txStateStorage(), partId).join();
RaftGroupOptions groupOptions = groupOptionsForPartition(
internalTable.storage(),
@@ -2057,8 +2057,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* @return Future that will complete when the operation completes.
*/
private CompletableFuture<MvPartitionStorage> getOrCreateMvPartition(MvTableStorage mvTableStorage, int partitionId) {
- // TODO: IGNITE-18603 Clear if TxStateStorage hasn't been rebalanced yet
- return CompletableFuture.supplyAsync(() -> mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor);
+ // TODO: IGNITE-18633 Should clean both MvPartitionStorage and TxStateStorage if the rebalance for one of them has not ended
+ // TODO: IGNITE-18633 Also think about waiting for index stores for a partition, see PartitionAccessImpl.startRebalance
+ return CompletableFuture.supplyAsync(() -> mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor)
+ .thenCompose(mvPartitionStorage -> {
+ if (mvPartitionStorage.persistedIndex() == MvPartitionStorage.REBALANCE_IN_PROGRESS) {
+ return mvTableStorage.clearPartition(partitionId).thenApply(unused -> mvPartitionStorage);
+ } else {
+ return completedFuture(mvPartitionStorage);
+ }
+ });
}
/**
@@ -2068,18 +2076,19 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* in when the rebalance was interrupted.
*
* @param txStateTableStorage Transaction state storage for a table.
- * @param partId Partition ID.
- */
- private static TxStateStorage getOrCreateTxStateStorage(TxStateTableStorage txStateTableStorage, int partId) {
- // TODO: IGNITE-18603 Clear if MvPartitionStorage hasn't been rebalanced yet
- return txStateTableStorage.getOrCreateTxStateStorage(partId);
- }
-
- /**
- * Async version of {@link #getOrCreateTxStateStorage}.
+ * @param partitionId Partition ID.
+ * @return Future that will complete when the operation completes.
*/
- private CompletableFuture<TxStateStorage> getOrCreateTxStateStorageAsync(TxStateTableStorage txStateTableStorage, int partId) {
- return CompletableFuture.supplyAsync(() -> getOrCreateTxStateStorage(txStateTableStorage, partId), ioExecutor);
+ private CompletableFuture<TxStateStorage> getOrCreateTxStateStorage(TxStateTableStorage txStateTableStorage, int partitionId) {
+ // TODO: IGNITE-18633 Should clean both MvPartitionStorage and TxStateStorage if the rebalance for one of them has not ended
+ return CompletableFuture.supplyAsync(() -> txStateTableStorage.getOrCreateTxStateStorage(partitionId), ioExecutor)
+ .thenCompose(txStateStorage -> {
+ if (txStateStorage.persistedIndex() == TxStateStorage.REBALANCE_IN_PROGRESS) {
+ return txStateStorage.clear().thenApply(unused -> txStateStorage);
+ } else {
+ return completedFuture(txStateStorage);
+ }
+ });
}
private static PeersAndLearners configurationFromAssignments(Collection<Assignment> assignments) {
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
index 7938cd43b4..1918108a0d 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
@@ -194,4 +194,22 @@ public interface TxStateStorage extends ManuallyCloseable {
* has failed.
*/
CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm);
+
+ /**
+ * Clears transaction state storage. After the cleaning is completed, the storage will be fully available.
+ * <ul>
+ * <li>Cancels all current operations (including cursors) with storage and waits for their completion;</li>
+ * <li>Does not allow operations to be performed (exceptions will be thrown) with the storage until the cleaning is completed;</li>
+ * <li>Clears storage;</li>
+ * <li>Sets the {@link #lastAppliedIndex()}, {@link #lastAppliedTerm()} and {@link #persistedIndex()} to {@code 0};</li>
+ * <li>Once the storage cleanup is complete (success or error), allows to perform all storage operations.</li>
+ * </ul>
+ *
+ * @return Future of transaction state storage cleanup.
+ * @throws IgniteInternalException with {@link Transactions#TX_STATE_STORAGE_STOPPED_ERR} if the storage is closed or destroyed.
+ * @throws IgniteInternalException with {@link Transactions#TX_STATE_STORAGE_REBALANCE_ERR} if the storage is in process of rebalance.
+ * @throws IgniteInternalException with {@link Transactions#TX_STATE_STORAGE_ERR} if the storage is in progress of cleanup or failed for
+ * another reason.
+ */
+ CompletableFuture<Void> clear();
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index 5aa7ddfa37..1fff2ab9cd 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -133,27 +133,10 @@ public class TxStateRocksDbStorage implements TxStateStorage {
byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
if (indexAndTermBytes != null) {
- long lastAppliedIndex = bytesToLong(indexAndTermBytes);
-
- if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
- try (WriteBatch writeBatch = new WriteBatch()) {
- writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
- writeBatch.delete(lastAppliedIndexAndTermKey);
-
- db.write(writeOptions, writeBatch);
- } catch (Exception e) {
- throw new IgniteInternalException(
- TX_STATE_STORAGE_REBALANCE_ERR,
- IgniteStringFormatter.format("Failed to clear storage:", createStorageInfo()),
- e
- );
- }
- } else {
- this.lastAppliedIndex = lastAppliedIndex;
- persistedIndex = lastAppliedIndex;
+ lastAppliedIndex = bytesToLong(indexAndTermBytes);
+ lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
- lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
- }
+ persistedIndex = lastAppliedIndex;
}
return null;
@@ -231,10 +214,7 @@ public class TxStateRocksDbStorage implements TxStateStorage {
// This is necessary to prevent a situation where, in the middle of the rebalance, the node will be restarted and we will
// have non-consistent storage. They will be updated by either #abortRebalance() or #finishRebalance(long, long).
if (state.get() != StorageState.REBALANCE) {
- writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(commandIndex, commandTerm));
-
- lastAppliedIndex = commandIndex;
- lastAppliedTerm = commandTerm;
+ updateLastApplied(writeBatch, commandIndex, commandTerm);
}
db.write(writeOptions, writeBatch);
@@ -428,7 +408,7 @@ public class TxStateRocksDbStorage implements TxStateStorage {
}
try (WriteBatch writeBatch = new WriteBatch()) {
- writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+ clearStorageData(writeBatch);
writeBatch.delete(lastAppliedIndexAndTermKey);
@@ -483,14 +463,11 @@ public class TxStateRocksDbStorage implements TxStateStorage {
busyLock.block();
try (WriteBatch writeBatch = new WriteBatch()) {
- writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
- writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+ clearStorageData(writeBatch);
- db.write(writeOptions, writeBatch);
+ updateLastAppliedAndPersistedIndex(writeBatch, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
- lastAppliedIndex = REBALANCE_IN_PROGRESS;
- lastAppliedTerm = REBALANCE_IN_PROGRESS;
- persistedIndex = REBALANCE_IN_PROGRESS;
+ db.write(writeOptions, writeBatch);
return completedFuture(null);
} catch (Exception e) {
@@ -511,7 +488,8 @@ public class TxStateRocksDbStorage implements TxStateStorage {
}
try (WriteBatch writeBatch = new WriteBatch()) {
- writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+ clearStorageData(writeBatch);
+
writeBatch.delete(lastAppliedIndexAndTermKey);
db.write(writeOptions, writeBatch);
@@ -542,14 +520,10 @@ public class TxStateRocksDbStorage implements TxStateStorage {
}
try (WriteBatch writeBatch = new WriteBatch()) {
- writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(lastAppliedIndex, lastAppliedTerm));
+ updateLastAppliedAndPersistedIndex(writeBatch, lastAppliedIndex, lastAppliedTerm);
db.write(writeOptions, writeBatch);
- this.lastAppliedIndex = lastAppliedIndex;
- this.lastAppliedTerm = lastAppliedTerm;
- this.persistedIndex = lastAppliedIndex;
-
state.set(StorageState.RUNNABLE);
} catch (Exception e) {
throw new IgniteInternalException(
@@ -562,6 +536,57 @@ public class TxStateRocksDbStorage implements TxStateStorage {
return completedFuture(null);
}
+ @Override
+ public CompletableFuture<Void> clear() {
+ if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
+ throwExceptionDependingOnStorageState();
+ }
+
+ // We changed the status and wait for all current operations (together with cursors) with the storage to be completed.
+ busyLock.block();
+
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ clearStorageData(writeBatch);
+
+ updateLastAppliedAndPersistedIndex(writeBatch, 0, 0);
+
+ db.write(writeOptions, writeBatch);
+
+ return completedFuture(null);
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_ERR,
+ IgniteStringFormatter.format("Failed to cleanup storage: [{}]", createStorageInfo()),
+ e
+ );
+ } finally {
+ state.set(StorageState.RUNNABLE);
+
+ busyLock.unblock();
+ }
+ }
+
+ private void clearStorageData(WriteBatch writeBatch) throws RocksDBException {
+ writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
+ }
+
+ private void updateLastApplied(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+ writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(lastAppliedIndex, lastAppliedTerm));
+
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+ }
+
+ private void updateLastAppliedAndPersistedIndex(
+ WriteBatch writeBatch,
+ long lastAppliedIndex,
+ long lastAppliedTerm
+ ) throws RocksDBException {
+ updateLastApplied(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+ persistedIndex = lastAppliedIndex;
+ }
+
/**
* Tries to close the storage with resources if it hasn't already been closed.
*
@@ -609,6 +634,11 @@ public class TxStateRocksDbStorage implements TxStateStorage {
);
case REBALANCE:
throw createStorageInProgressOfRebalanceException();
+ case CLEANUP:
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_ERR,
+ IgniteStringFormatter.format("Storage is in the process of cleanup: [{}]", createStorageInfo())
+ );
default:
throw new IgniteInternalException(
TX_STATE_STORAGE_ERR,
@@ -644,6 +674,9 @@ public class TxStateRocksDbStorage implements TxStateStorage {
CLOSED,
/** Storage is in the process of being rebalanced. */
- REBALANCE
+ REBALANCE,
+
+ /** Storage is in the process of cleanup. */
+ CLEANUP
}
}
diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
index 1b62eea05d..b37cf34ccb 100644
--- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
+++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
@@ -17,17 +17,15 @@
package org.apache.ignite.internal.tx.storage.state.rocksdb;
-import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.tx.storage.state.TxStateStorage.REBALANCE_IN_PROGRESS;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.empty;
import java.nio.file.Path;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
@@ -36,7 +34,6 @@ import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.storage.state.AbstractTxStateStorageTest;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
-import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.IgniteBiTuple;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -64,19 +61,21 @@ public class RocksDbTxStateStorageTest extends AbstractTxStateStorageTest {
}
@Test
- void testRestartStorageInProgressOfRebalance() throws Exception {
+ void testRestartStorageInProgressOfRebalance() {
TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(0);
- fillStorage(
- storage,
- List.of(randomTxMetaTuple(1, UUID.randomUUID()), randomTxMetaTuple(1, UUID.randomUUID()))
+ List<IgniteBiTuple<UUID, TxMeta>> rows = List.of(
+ randomTxMetaTuple(1, UUID.randomUUID()),
+ randomTxMetaTuple(1, UUID.randomUUID())
);
- storage.flush().get(10, TimeUnit.SECONDS);
+ fillStorage(storage, rows);
// We emulate the situation that the rebalancing did not have time to end.
storage.lastApplied(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+ assertThat(storage.flush(), willCompleteSuccessfully());
+
tableStorage.stop();
tableStorage = createTableStorage();
@@ -85,10 +84,8 @@ public class RocksDbTxStateStorageTest extends AbstractTxStateStorageTest {
storage = tableStorage.getOrCreateTxStateStorage(0);
- checkLastApplied(storage, 0, 0, 0);
+ checkLastApplied(storage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
- try (Cursor<IgniteBiTuple<UUID, TxMeta>> scan = storage.scan()) {
- assertThat(scan.stream().collect(toList()), empty());
- }
+ checkStorageContainsRows(storage, rows);
}
}
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
index 3580d589e4..b748acbd7c 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.tx.storage.state;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.tx.storage.state.TxStateStorage.REBALANCE_IN_PROGRESS;
@@ -317,20 +316,15 @@ public abstract class AbstractTxStateStorageTest {
// Let's check the storage.
checkLastApplied(storage, 30, 30, 50);
- try (Cursor<IgniteBiTuple<UUID, TxMeta>> scan = storage.scan()) {
- assertThat(
- scan.stream().collect(toList()),
- containsInAnyOrder(rowsOnRebalance.toArray(new IgniteBiTuple[0]))
- );
- }
+ checkStorageContainsRows(storage, rowsOnRebalance);
}
@Test
- public void testFailRebalance() throws Exception {
+ public void testFailRebalance() {
TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(0);
// Nothing will happen because rebalance has not started.
- storage.abortRebalance().get(1, SECONDS);
+ assertThat(storage.abortRebalance(), willCompleteSuccessfully());
List<IgniteBiTuple<UUID, TxMeta>> rowsBeforeStartRebalance = List.of(
randomTxMetaTuple(1, UUID.randomUUID()),
@@ -355,15 +349,13 @@ public abstract class AbstractTxStateStorageTest {
// Let's check the storage.
checkLastApplied(storage, 0, 0, 0);
- try (Cursor<IgniteBiTuple<UUID, TxMeta>> scan = storage.scan()) {
- assertThat(scan.stream().collect(toList()), is(empty()));
- }
+ checkStorageIsEmpty(storage);
}
@Test
public void testStartRebalanceForClosedOrDestroedPartition() {
TxStateStorage storage0 = tableStorage.getOrCreateTxStateStorage(0);
- TxStateStorage storage1 = tableStorage.getOrCreateTxStateStorage(0);
+ TxStateStorage storage1 = tableStorage.getOrCreateTxStateStorage(1);
storage0.close();
storage1.destroy();
@@ -372,6 +364,64 @@ public abstract class AbstractTxStateStorageTest {
assertThrowsIgniteInternalException(TX_STATE_STORAGE_STOPPED_ERR, storage1::startRebalance);
}
+ @Test
+ void testClear() {
+ TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(0);
+
+ // Cleaning up on empty storage should not generate errors.
+ assertThat(storage.clear(), willCompleteSuccessfully());
+
+ checkLastApplied(storage, 0, 0, 0);
+ checkStorageIsEmpty(storage);
+
+ List<IgniteBiTuple<UUID, TxMeta>> rows = List.of(
+ randomTxMetaTuple(1, UUID.randomUUID()),
+ randomTxMetaTuple(1, UUID.randomUUID())
+ );
+
+ fillStorage(storage, rows);
+
+ // Cleanup the non-empty storage.
+ assertThat(storage.clear(), willCompleteSuccessfully());
+
+ checkLastApplied(storage, 0, 0, 0);
+ checkStorageIsEmpty(storage);
+ }
+
+ @Test
+ void testCleanOnClosedDestroyedAndRebalancedStorages() {
+ TxStateStorage storage0 = tableStorage.getOrCreateTxStateStorage(0);
+ TxStateStorage storage1 = tableStorage.getOrCreateTxStateStorage(1);
+ TxStateStorage storage2 = tableStorage.getOrCreateTxStateStorage(2);
+
+ storage0.close();
+ storage1.destroy();
+ assertThat(storage2.startRebalance(), willCompleteSuccessfully());
+
+ try {
+ assertThrowsIgniteInternalException(TX_STATE_STORAGE_STOPPED_ERR, storage0::clear);
+ assertThrowsIgniteInternalException(TX_STATE_STORAGE_STOPPED_ERR, storage1::clear);
+ assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, storage2::clear);
+ } finally {
+ assertThat(storage2.abortRebalance(), willCompleteSuccessfully());
+ }
+ }
+
+ private static void checkStorageIsEmpty(TxStateStorage storage) {
+ try (Cursor<IgniteBiTuple<UUID, TxMeta>> scan = storage.scan()) {
+ assertThat(scan.stream().collect(toList()), is(empty()));
+ }
+ }
+
+ protected static void checkStorageContainsRows(TxStateStorage storage, List<IgniteBiTuple<UUID, TxMeta>> expRows) {
+ try (Cursor<IgniteBiTuple<UUID, TxMeta>> scan = storage.scan()) {
+ assertThat(
+ scan.stream().collect(toList()),
+ containsInAnyOrder(expRows.toArray(new IgniteBiTuple[0]))
+ );
+ }
+ }
+
private void startRebalanceWithChecks(TxStateStorage storage, List<IgniteBiTuple<UUID, TxMeta>> rows) {
fillStorage(storage, rows);
@@ -446,8 +496,7 @@ public abstract class AbstractTxStateStorageTest {
for (int i = 0; i < rows.size(); i++) {
IgniteBiTuple<UUID, TxMeta> row = rows.get(i);
- // If even.
- if ((i & 1) == 0) {
+ if ((i % 2) == 0) {
assertTrue(storage.compareAndSet(row.get1(), null, row.get2(), i * 10L, i * 10L));
} else {
storage.put(row.get1(), row.get2());
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
index ebbf24309f..a8eee03a35 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
@@ -231,6 +231,22 @@ public class TestTxStateStorage implements TxStateStorage {
.whenComplete((unused, throwable) -> lastApplied(lastAppliedIndex, lastAppliedTerm));
}
+ @Override
+ public CompletableFuture<Void> clear() {
+ checkStorageClosed();
+
+ if (rebalanceFutureReference.get() != null) {
+ throw new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, "In the process of rebalancing");
+ }
+
+ storage.clear();
+
+ lastAppliedIndex = 0;
+ lastAppliedTerm = 0;
+
+ return completedFuture(null);
+ }
+
private void checkStorageInProgreesOfRebalance() {
if (rebalanceFutureReference.get() != null) {
throwRebalanceInProgressException();