You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/01/23 06:29:19 UTC
[ignite-3] branch main updated: IGNITE-18596 Add RaftGroupConfiguration to MvTableStorage#finishRebalancePartition (#1557)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new dc6eded69c IGNITE-18596 Add RaftGroupConfiguration to MvTableStorage#finishRebalancePartition (#1557)
dc6eded69c is described below
commit dc6eded69c7823be7a8ccea24d25d901b11bfd0e
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Mon Jan 23 09:29:14 2023 +0300
IGNITE-18596 Add RaftGroupConfiguration to MvTableStorage#finishRebalancePartition (#1557)
---
.../internal/storage/engine/MvTableStorage.java | 24 +++++++---
.../storage/AbstractMvTableStorageTest.java | 51 +++++++++++++++++++---
.../storage/impl/TestMvPartitionStorage.java | 7 ++-
.../internal/storage/impl/TestMvTableStorage.java | 10 ++++-
.../pagememory/AbstractPageMemoryTableStorage.java | 10 ++++-
.../mv/AbstractPageMemoryMvPartitionStorage.java | 6 +++
.../mv/PersistentPageMemoryMvPartitionStorage.java | 51 +++++++++++++---------
.../mv/VolatilePageMemoryMvPartitionStorage.java | 9 +++-
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 22 +++++++---
.../storage/rocksdb/RocksDbTableStorage.java | 10 ++++-
10 files changed, 156 insertions(+), 44 deletions(-)
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index 555882a924..6e4619759d 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.schema.configuration.index.TableIndexConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
@@ -177,8 +178,9 @@ public interface MvTableStorage extends ManuallyCloseable {
* <ul>
* <li>Cleans up the {@link MvPartitionStorage multi-version partition storage} and its associated indexes ({@link HashIndexStorage}
* and {@link SortedIndexStorage});</li>
- * <li>Sets {@link MvPartitionStorage#lastAppliedIndex()} and {@link MvPartitionStorage#lastAppliedTerm()} to
- * {@link MvPartitionStorage#REBALANCE_IN_PROGRESS};</li>
+ * <li>Sets {@link MvPartitionStorage#lastAppliedIndex()}, {@link MvPartitionStorage#lastAppliedTerm()} to
+ * {@link MvPartitionStorage#REBALANCE_IN_PROGRESS} and {@link MvPartitionStorage#committedGroupConfiguration()} to {@code null};
+ * </li>
* <li>Stops the cursors of a multi-version partition storage and its indexes, subsequent calls to {@link Cursor#hasNext()} and
* {@link Cursor#next()} will throw {@link StorageRebalanceException};</li>
* <li>For a multi-version partition storage and its indexes, methods for reading and writing data will throw
@@ -189,6 +191,7 @@ public interface MvTableStorage extends ManuallyCloseable {
* <li>{@link MvPartitionStorage#lastAppliedIndex()};</li>
* <li>{@link MvPartitionStorage#lastAppliedTerm()};</li>
* <li>{@link MvPartitionStorage#persistedIndex()};</li>
+ * <li>{@link MvPartitionStorage#committedGroupConfiguration()};</li>
* <li>{@link HashIndexStorage#put(IndexRow)};</li>
* <li>{@link SortedIndexStorage#put(IndexRow)};</li>
* </ul></li>
@@ -198,7 +201,8 @@ public interface MvTableStorage extends ManuallyCloseable {
* to one of the methods:
* <ul>
* <li>{@link #abortRebalancePartition(int)} ()} - in case of errors or cancellation of rebalance;</li>
- * <li>{@link #finishRebalancePartition(int, long, long)} - in case of successful completion of rebalance.</li>
+ * <li>{@link #finishRebalancePartition(int, long, long, RaftGroupConfiguration)} - in case of successful completion of rebalance.
+ * </li>
* </ul>
*
* <p>If the {@link MvPartitionStorage#lastAppliedIndex()} is {@link MvPartitionStorage#REBALANCE_IN_PROGRESS} after a node restart
@@ -218,7 +222,8 @@ public interface MvTableStorage extends ManuallyCloseable {
* <ul>
* <li>Cleans up the {@link MvPartitionStorage multi-version partition storage} and its associated indexes ({@link HashIndexStorage}
* and {@link SortedIndexStorage});</li>
- * <li>Sets {@link MvPartitionStorage#lastAppliedIndex()} and {@link MvPartitionStorage#lastAppliedTerm()} to {@code 0};</li>
+ * <li>Sets {@link MvPartitionStorage#lastAppliedIndex()}, {@link MvPartitionStorage#lastAppliedTerm()} to {@code 0} and
+ * {@link MvPartitionStorage#committedGroupConfiguration()} to {@code null};</li>
* <li>For a multi-version partition storage and its indexes, methods for writing and reading will be available.</li>
* </ul>
*
@@ -232,7 +237,8 @@ public interface MvTableStorage extends ManuallyCloseable {
/**
* Completes rebalance for a partition.
* <ul>
- * <li>Updates {@link MvPartitionStorage#lastAppliedIndex()} and {@link MvPartitionStorage#lastAppliedTerm()};</li>
+ * <li>Updates {@link MvPartitionStorage#lastAppliedIndex()}, {@link MvPartitionStorage#lastAppliedTerm()} and
+ * {@link MvPartitionStorage#committedGroupConfiguration()};</li>
* <li>For a multi-version partition storage and its indexes, methods for writing and reading will be available.</li>
* </ul>
*
@@ -240,9 +246,15 @@ public interface MvTableStorage extends ManuallyCloseable {
*
* @param lastAppliedIndex Last applied index.
* @param lastAppliedTerm Last applied term.
+ * @param raftGroupConfig RAFT group configuration.
* @return Future of the finish rebalance for a multi-version partition storage and its indexes.
* @throws IllegalArgumentException If Partition ID is out of bounds.
* @throws StorageRebalanceException If there is an error when completing rebalance.
*/
- CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm);
+ CompletableFuture<Void> finishRebalancePartition(
+ int partitionId,
+ long lastAppliedIndex,
+ long lastAppliedTerm,
+ RaftGroupConfiguration raftGroupConfig
+ );
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index b12d6b4f25..8715cfe9b6 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -36,6 +36,7 @@ import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assumptions.assumeFalse;
@@ -43,6 +44,7 @@ import static org.mockito.Mockito.mock;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
@@ -425,13 +427,16 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
}
@Test
- public void testSuccessRebalance() throws Exception {
+ public void testSuccessRebalance() {
MvPartitionStorage mvPartitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
// Error because reblance has not yet started for the partition.
- assertThrows(StorageRebalanceException.class, () -> tableStorage.finishRebalancePartition(PARTITION_ID, 100, 500));
+ assertThrows(
+ StorageRebalanceException.class,
+ () -> tableStorage.finishRebalancePartition(PARTITION_ID, 100, 500, mock(RaftGroupConfiguration.class))
+ );
List<IgniteTuple3<RowId, TableRow, HybridTimestamp>> rowsBeforeRebalanceStart = List.of(
new IgniteTuple3<>(new RowId(PARTITION_ID), tableRow(new TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
@@ -455,22 +460,32 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsOnRebalance);
checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+ assertNull(mvPartitionStorage.committedGroupConfiguration());
// Let's finish rebalancing.
// Partition is out of configuration range.
- assertThrows(IllegalArgumentException.class, () -> tableStorage.finishRebalancePartition(getPartitionIdOutOfRange(), 100, 500));
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> tableStorage.finishRebalancePartition(getPartitionIdOutOfRange(), 100, 500, mock(RaftGroupConfiguration.class))
+ );
// Partition does not exist.
- assertThrows(StorageRebalanceException.class, () -> tableStorage.finishRebalancePartition(1, 100, 500));
+ assertThrows(
+ StorageRebalanceException.class,
+ () -> tableStorage.finishRebalancePartition(1, 100, 500, mock(RaftGroupConfiguration.class))
+ );
+
+ RaftGroupConfiguration raftGroupConfig = createRandomRaftGroupConfiguration();
- assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, 10, 20), willCompleteSuccessfully());
+ assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, 10, 20, raftGroupConfig), willCompleteSuccessfully());
// Let's check the storages after success finish rebalance.
checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsBeforeRebalanceStart);
checkForPresenceRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsOnRebalance);
checkLastApplied(mvPartitionStorage, 10, 10, 20);
+ checkRaftGroupConfigs(raftGroupConfig, mvPartitionStorage.committedGroupConfiguration());
}
@Test
@@ -517,6 +532,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsOnRebalance);
checkLastApplied(mvPartitionStorage, 0, 0, 0);
+ assertNull(mvPartitionStorage.committedGroupConfiguration());
}
@Test
@@ -772,10 +788,13 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
private void checkMvPartitionStorageMethodsAfterStartRebalance(MvPartitionStorage storage) {
checkLastApplied(storage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+ assertNull(storage.committedGroupConfiguration());
+
assertDoesNotThrow(() -> storage.committedGroupConfiguration());
storage.runConsistently(() -> {
assertThrows(StorageRebalanceException.class, () -> storage.lastApplied(100, 500));
+ assertThrows(StorageRebalanceException.class, () -> storage.committedGroupConfiguration(mock(RaftGroupConfiguration.class)));
assertThrows(
StorageRebalanceException.class,
@@ -917,4 +936,26 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
return cursor.stream().map(ReadResult::tableRow).map(TableRow::bytes).collect(toList());
}
}
+
+ private static RaftGroupConfiguration createRandomRaftGroupConfiguration() {
+ Random random = new Random(System.currentTimeMillis());
+
+ return new RaftGroupConfiguration(
+ random.ints(random.nextInt(10)).mapToObj(i -> "peer" + i).collect(toList()),
+ random.ints(random.nextInt(10)).mapToObj(i -> "lerner" + i).collect(toList()),
+ random.ints(random.nextInt(10)).mapToObj(i -> "oldPeer" + i).collect(toList()),
+ random.ints(random.nextInt(10)).mapToObj(i -> "oldLerner" + i).collect(toList())
+ );
+ }
+
+ private static void checkRaftGroupConfigs(RaftGroupConfiguration exp, RaftGroupConfiguration act) {
+ assertNotNull(exp);
+ assertNotNull(act);
+
+ assertThat(act.peers(), equalTo(exp.peers()));
+ assertThat(act.learners(), equalTo(exp.learners()));
+
+ assertThat(act.oldPeers(), equalTo(exp.oldPeers()));
+ assertThat(act.oldLearners(), equalTo(exp.oldLearners()));
+ }
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index e136eb841c..814e230c69 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -580,6 +580,8 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
lastAppliedIndex = REBALANCE_IN_PROGRESS;
lastAppliedTerm = REBALANCE_IN_PROGRESS;
+
+ groupConfig = null;
}
void abortRebalance() {
@@ -595,9 +597,11 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
lastAppliedIndex = 0;
lastAppliedTerm = 0;
+
+ groupConfig = null;
}
- void finishRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+ void finishRebalance(long lastAppliedIndex, long lastAppliedTerm, RaftGroupConfiguration raftGroupConfig) {
checkStorageClosed();
assert rebalance;
@@ -606,6 +610,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
this.lastAppliedIndex = lastAppliedIndex;
this.lastAppliedTerm = lastAppliedTerm;
+ this.groupConfig = raftGroupConfig;
}
boolean closed() {
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index dac39a6201..c416ed4053 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -30,6 +30,7 @@ import java.util.stream.Stream;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
@@ -307,7 +308,12 @@ public class TestMvTableStorage implements MvTableStorage {
}
@Override
- public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
+ public CompletableFuture<Void> finishRebalancePartition(
+ int partitionId,
+ long lastAppliedIndex,
+ long lastAppliedTerm,
+ RaftGroupConfiguration raftGroupConfig
+ ) {
checkPartitionId(partitionId);
CompletableFuture<Void> rebalanceFuture = rebalanceFutureByPartitionId.remove(partitionId);
@@ -324,7 +330,7 @@ public class TestMvTableStorage implements MvTableStorage {
return rebalanceFuture
.thenAccept(unused -> {
- partitionStorage.finishRebalance(lastAppliedIndex, lastAppliedTerm);
+ partitionStorage.finishRebalance(lastAppliedIndex, lastAppliedTerm, raftGroupConfig);
testHashIndexStorageStream(partitionId).forEach(TestHashIndexStorage::finishRebalance);
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index bb671be899..6e6b2bf4e5 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
@@ -378,7 +379,12 @@ public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
}
@Override
- public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
+ public CompletableFuture<Void> finishRebalancePartition(
+ int partitionId,
+ long lastAppliedIndex,
+ long lastAppliedTerm,
+ RaftGroupConfiguration raftGroupConfig
+ ) {
return inBusyLock(busyLock, () -> {
AbstractPageMemoryMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
@@ -396,6 +402,8 @@ public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
mvPartitionStorage.runConsistently(() -> {
mvPartitionStorage.lastAppliedOnRebalance(lastAppliedIndex, lastAppliedTerm);
+ mvPartitionStorage.committedGroupConfigurationOnRebalance(raftGroupConfig);
+
return null;
});
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 4fdb1cb59c..1af00faf88 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.schema.configuration.index.SortedIndexView;
import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
+import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageClosedException;
@@ -1106,4 +1107,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
* Returns resources that will have to close on rebalancing.
*/
abstract List<AutoCloseable> getResourcesToCloseOnRebalance();
+
+ /**
+ * Sets the RAFT group configuration on rebalance.
+ */
+ public abstract void committedGroupConfigurationOnRebalance(RaftGroupConfiguration config);
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index a19abb6ddf..0fd7a18b44 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -233,33 +233,37 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
@Override
public void committedGroupConfiguration(RaftGroupConfiguration config) {
busy(() -> {
- assert checkpointTimeoutLock.checkpointLockIsHeldByThread();
-
throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
- CheckpointProgress lastCheckpoint = checkpointManager.lastCheckpointProgress();
- UUID lastCheckpointId = lastCheckpoint == null ? null : lastCheckpoint.id();
+ committedGroupConfigurationBusy(config);
- byte[] groupConfigBytes = replicationProtocolGroupConfigToBytes(config);
+ return null;
+ });
+ }
- replicationProtocolGroupConfigReadWriteLock.writeLock().lock();
+ private void committedGroupConfigurationBusy(RaftGroupConfiguration config) {
+ assert checkpointTimeoutLock.checkpointLockIsHeldByThread();
- try {
- if (meta.lastReplicationProtocolGroupConfigFirstPageId() == BlobStorage.NO_PAGE_ID) {
- long configPageId = blobStorage.addBlob(groupConfigBytes);
+ CheckpointProgress lastCheckpoint = checkpointManager.lastCheckpointProgress();
+ UUID lastCheckpointId = lastCheckpoint == null ? null : lastCheckpoint.id();
- meta.lastReplicationProtocolGroupConfigFirstPageId(lastCheckpointId, configPageId);
- } else {
- blobStorage.updateBlob(meta.lastReplicationProtocolGroupConfigFirstPageId(), groupConfigBytes);
- }
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Cannot save committed group configuration, groupId=" + groupId + ", partitionId=" + groupId, e);
- } finally {
- replicationProtocolGroupConfigReadWriteLock.writeLock().unlock();
- }
+ byte[] groupConfigBytes = replicationProtocolGroupConfigToBytes(config);
- return null;
- });
+ replicationProtocolGroupConfigReadWriteLock.writeLock().lock();
+
+ try {
+ if (meta.lastReplicationProtocolGroupConfigFirstPageId() == BlobStorage.NO_PAGE_ID) {
+ long configPageId = blobStorage.addBlob(groupConfigBytes);
+
+ meta.lastReplicationProtocolGroupConfigFirstPageId(lastCheckpointId, configPageId);
+ } else {
+ blobStorage.updateBlob(meta.lastReplicationProtocolGroupConfigFirstPageId(), groupConfigBytes);
+ }
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Cannot save committed group configuration, groupId=" + groupId + ", partitionId=" + groupId, e);
+ } finally {
+ replicationProtocolGroupConfigReadWriteLock.writeLock().unlock();
+ }
}
@Nullable
@@ -396,4 +400,11 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
blobStorage::close
);
}
+
+ @Override
+ public void committedGroupConfigurationOnRebalance(RaftGroupConfiguration config) {
+ throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+
+ committedGroupConfigurationBusy(config);
+ }
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index 7779930359..88f9a06182 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -141,7 +141,7 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
}
@Override
- public void lastAppliedOnRebalance(long lastAppliedIndex, long lastAppliedTerm) throws StorageException {
+ public void lastAppliedOnRebalance(long lastAppliedIndex, long lastAppliedTerm) {
throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
this.lastAppliedIndex = lastAppliedIndex;
@@ -220,4 +220,11 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
);
}
}
+
+ @Override
+ public void committedGroupConfigurationOnRebalance(RaftGroupConfiguration config) throws StorageException {
+ throwExceptionIfStorageNotInProgressOfRebalance(state.get(), this::createStorageInfo);
+
+ this.groupConfig = config;
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 4a15f08c72..4c408fc3e2 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -356,12 +356,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
busy(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
- WriteBatchWithIndex writeBatch = requireWriteBatch();
-
try {
- writeBatch.put(meta, lastGroupConfigKey, ByteUtils.toBytes(config));
-
- pendingGroupConfig = config;
+ saveRaftGroupConfiguration(requireWriteBatch(), config);
return null;
} catch (RocksDBException e) {
@@ -370,6 +366,12 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
});
}
+ private void saveRaftGroupConfiguration(AbstractWriteBatch writeBatch, RaftGroupConfiguration config) throws RocksDBException {
+ writeBatch.put(meta, lastGroupConfigKey, ByteUtils.toBytes(config));
+
+ pendingGroupConfig = config;
+ }
+
/**
* Reads a value of {@link #lastAppliedIndex()} from the storage, avoiding memtable, and sets it as a new value of
* {@link #persistedIndex()}.
@@ -1505,13 +1507,15 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
*
* @throws StorageRebalanceException If there was an error when finishing the rebalance.
*/
- void finishRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) {
+ void finishRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm, RaftGroupConfiguration raftGroupConfig) {
if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
}
try {
saveLastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+ saveRaftGroupConfigurationOnRebalance(writeBatch, raftGroupConfig);
} catch (RocksDBException e) {
throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
}
@@ -1533,4 +1537,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
persistedIndex = lastAppliedIndex;
}
+
+ private void saveRaftGroupConfigurationOnRebalance(WriteBatch writeBatch, RaftGroupConfiguration config) throws RocksDBException {
+ saveRaftGroupConfiguration(writeBatch, config);
+
+ this.lastGroupConfig = config;
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 811a30dcc0..2256c930b4 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
@@ -710,7 +711,12 @@ public class RocksDbTableStorage implements MvTableStorage {
}
@Override
- public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
+ public CompletableFuture<Void> finishRebalancePartition(
+ int partitionId,
+ long lastAppliedIndex,
+ long lastAppliedTerm,
+ RaftGroupConfiguration raftGroupConfig
+ ) {
return inBusyLock(busyLock, () -> {
RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
@@ -725,7 +731,7 @@ public class RocksDbTableStorage implements MvTableStorage {
}
try (WriteBatch writeBatch = new WriteBatch()) {
- mvPartitionStorage.finishRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+ mvPartitionStorage.finishRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm, raftGroupConfig);
getHashIndexStorages(partitionId).forEach(RocksDbHashIndexStorage::finishRebalance);
getSortedIndexStorages(partitionId).forEach(RocksDbSortedIndexStorage::finishRebalance);