You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2023/11/07 15:29:52 UTC
(ignite-3) branch main updated: IGNITE-20796 Refactor replica stop in TableManager (#2807)
This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5537f031dd IGNITE-20796 Refactor replica stop in TableManager (#2807)
5537f031dd is described below
commit 5537f031dd24cca844ffac807a177ee0ca525a9b
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Tue Nov 7 17:29:46 2023 +0200
IGNITE-20796 Refactor replica stop in TableManager (#2807)
---
.../internal/table/distributed/TableManager.java | 191 +++++++++++----------
1 file changed, 97 insertions(+), 94 deletions(-)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 6d2e40f546..4fb2bd45ca 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -853,12 +853,14 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
raftGroupService
);
+ CompletableFuture<Void> whenReplicaReady = allOf(
+ ((Loza) raftMgr).raftNodeReadyFuture(replicaGrpId),
+ table.pkIndexesReadyFuture()
+ );
+
replicaMgr.startReplica(
replicaGrpId,
- allOf(
- ((Loza) raftMgr).raftNodeReadyFuture(replicaGrpId),
- table.pkIndexesReadyFuture()
- ),
+ whenReplicaReady,
listener,
raftGroupService,
storageIndexTracker
@@ -1012,17 +1014,17 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
InternalTable internalTable = table.internalTable();
- for (int p = 0; p < internalTable.partitions(); p++) {
- TablePartitionId replicationGroupId = new TablePartitionId(table.tableId(), p);
-
- stopping.add(() -> closePartitionTrackers(internalTable, replicationGroupId.partitionId()));
+ stopping.add(() -> {
+ var stopReplicaFutures = new CompletableFuture<?>[internalTable.partitions()];
- stopping.add(() -> replicaMgr.stopReplica(replicationGroupId).get(10, TimeUnit.SECONDS));
+ for (int p = 0; p < internalTable.partitions(); p++) {
+ TablePartitionId replicationGroupId = new TablePartitionId(table.tableId(), p);
- stopping.add(() -> raftMgr.stopRaftNodes(replicationGroupId));
+ stopReplicaFutures[p] = stopPartition(replicationGroupId, table);
+ }
- stopping.add(() -> mvGc.removeStorage(replicationGroupId).get(10, TimeUnit.SECONDS));
- }
+ allOf(stopReplicaFutures).get(10, TimeUnit.SECONDS);
+ });
stopping.add(internalTable.storage());
stopping.add(internalTable.txStateStorage());
@@ -1273,71 +1275,58 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
int tableId = tableDescriptor.id();
int partitions = zoneDescriptor.partitions();
- try {
- CompletableFuture<?>[] removeStorageFromGcFutures = new CompletableFuture<?>[partitions];
-
- for (int p = 0; p < partitions; p++) {
- TablePartitionId replicationGroupId = new TablePartitionId(tableId, p);
-
- raftMgr.stopRaftNodes(replicationGroupId);
-
- removeStorageFromGcFutures[p] = replicaMgr
- .stopReplica(replicationGroupId)
- .thenCompose((notUsed) -> mvGc.removeStorage(replicationGroupId));
+ localPartsByTableIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> {
+ if (e != null) {
+ return failedFuture(e);
}
- localPartsByTableIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> {
- if (e != null) {
- return failedFuture(e);
- }
+ var newMap = new HashMap<>(previousVal);
+ newMap.remove(tableId);
- var newMap = new HashMap<>(previousVal);
- newMap.remove(tableId);
+ return completedFuture(newMap);
+ }));
- return completedFuture(newMap);
- }));
+ tablesByIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> {
+ if (e != null) {
+ return failedFuture(e);
+ }
- tablesByIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> {
- if (e != null) {
- return failedFuture(e);
- }
+ var map = new HashMap<>(previousVal);
- var map = new HashMap<>(previousVal);
+ TableImpl table = map.remove(tableId);
- TableImpl table = map.remove(tableId);
+ assert table != null : tableId;
- assert table != null : tableId;
+ InternalTable internalTable = table.internalTable();
- InternalTable internalTable = table.internalTable();
+ CompletableFuture<?>[] stopReplicaFutures = new CompletableFuture<?>[partitions];
- for (int partitionId = 0; partitionId < partitions; partitionId++) {
- closePartitionTrackers(internalTable, partitionId);
- }
+ for (int partitionId = 0; partitionId < partitions; partitionId++) {
+ var replicationGroupId = new TablePartitionId(tableId, partitionId);
- // TODO: IGNITE-18703 Destroy raft log and meta
+ stopReplicaFutures[partitionId] = stopPartition(replicationGroupId, table);
+ }
- CompletableFuture<Void> destroyTableStoragesFuture = allOf(removeStorageFromGcFutures)
- .thenCompose(unused -> allOf(
- internalTable.storage().destroy(),
- runAsync(() -> internalTable.txStateStorage().destroy(), ioExecutor))
- );
+ // TODO: IGNITE-18703 Destroy raft log and meta
+ CompletableFuture<Void> destroyTableStoragesFuture = allOf(stopReplicaFutures)
+ .thenCompose(unused -> allOf(
+ internalTable.storage().destroy(),
+ runAsync(() -> internalTable.txStateStorage().destroy(), ioExecutor))
+ );
- CompletableFuture<?> dropSchemaRegistryFuture = schemaManager.dropRegistry(causalityToken, table.tableId());
+ CompletableFuture<?> dropSchemaRegistryFuture = schemaManager.dropRegistry(causalityToken, table.tableId());
- return allOf(destroyTableStoragesFuture, dropSchemaRegistryFuture)
- .thenApply(v -> map);
- }));
+ return allOf(destroyTableStoragesFuture, dropSchemaRegistryFuture)
+ .thenApply(v -> map);
+ }));
- startedTables.remove(tableId);
+ startedTables.remove(tableId);
- Set<ByteArray> assignmentKeys = IntStream.range(0, partitions)
- .mapToObj(p -> stablePartAssignmentsKey(new TablePartitionId(tableId, p)))
- .collect(Collectors.toSet());
+ Set<ByteArray> assignmentKeys = IntStream.range(0, partitions)
+ .mapToObj(p -> stablePartAssignmentsKey(new TablePartitionId(tableId, p)))
+ .collect(Collectors.toSet());
- metaStorageMgr.removeAll(assignmentKeys);
- } catch (NodeStoppingException e) {
- // No op.
- }
+ metaStorageMgr.removeAll(assignmentKeys);
}
private CompletableFuture<Set<Assignment>> calculateAssignments(TablePartitionId tablePartitionId) {
@@ -2078,51 +2067,65 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
}, ioExecutor);
}
- private CompletableFuture<Void> stopAndDestroyPartition(TablePartitionId tablePartitionId, long revision) {
- try {
- raftMgr.stopRaftNodes(tablePartitionId);
- } catch (NodeStoppingException e) {
- // No-op
+ private CompletableFuture<Void> stopAndDestroyPartition(TablePartitionId tablePartitionId, long causalityToken) {
+ return tablesById(causalityToken)
+ .thenCompose(tables -> {
+ TableImpl table = tables.get(tablePartitionId.tableId());
+
+ return stopPartition(tablePartitionId, table)
+ .thenCompose(v -> destroyPartitionStorages(tablePartitionId, table));
+ });
+ }
+
+ /**
+ * Stops all resources associated with a given partition, like replicas and partition trackers.
+ *
+ * @param tablePartitionId Partition ID.
+ * @param table Table which this partition belongs to.
+ * @return Future that will be completed after all resources have been closed.
+ */
+ private CompletableFuture<Void> stopPartition(TablePartitionId tablePartitionId, TableImpl table) {
+ // TODO: IGNITE-19905 - remove the check.
+ if (table != null) {
+ closePartitionTrackers(table.internalTable(), tablePartitionId.partitionId());
}
- CompletableFuture<Boolean> stopReplicaFut;
+ CompletableFuture<Boolean> stopReplicaFuture;
+
try {
- stopReplicaFut = replicaMgr.stopReplica(tablePartitionId);
+ stopReplicaFuture = replicaMgr.stopReplica(tablePartitionId);
} catch (NodeStoppingException e) {
- stopReplicaFut = completedFuture(true);
+ // No-op.
+ stopReplicaFuture = completedFuture(false);
}
- return destroyPartitionStorages(tablePartitionId, revision, stopReplicaFut);
- }
-
- private CompletableFuture<Void> destroyPartitionStorages(
- TablePartitionId tablePartitionId,
- long revision,
- CompletableFuture<Boolean> stopReplicaFut
- ) {
- int partitionId = tablePartitionId.partitionId();
+ return stopReplicaFuture
+ .thenCompose(v -> {
+ try {
+ raftMgr.stopRaftNodes(tablePartitionId);
+ } catch (NodeStoppingException ignored) {
+ // No-op.
+ }
- return tablesById(revision)
- // TODO: IGNITE-18703 Destroy raft log and meta
- .thenCombine(mvGc.removeStorage(tablePartitionId), (tables, unused) -> {
- TableImpl table = tables.get(tablePartitionId.tableId());
+ return mvGc.removeStorage(tablePartitionId);
+ });
+ }
- // TODO: IGNITE-19905 - remove the check.
- if (table == null) {
- return allOf(stopReplicaFut);
- }
+ private CompletableFuture<Void> destroyPartitionStorages(TablePartitionId tablePartitionId, TableImpl table) {
+ // TODO: IGNITE-18703 Destroy raft log and meta
+ // TODO: IGNITE-19905 - remove the check.
+ if (table == null) {
+ return completedFuture(null);
+ }
- InternalTable internalTable = table.internalTable();
+ InternalTable internalTable = table.internalTable();
- closePartitionTrackers(internalTable, partitionId);
+ int partitionId = tablePartitionId.partitionId();
- return allOf(
- stopReplicaFut,
- internalTable.storage().destroyPartition(partitionId),
- runAsync(() -> internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
- );
- })
- .thenCompose(Function.identity());
+ return allOf(
+ internalTable.storage().destroyPartition(partitionId),
+ runAsync(() -> internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
+ );
}
private int[] collectTableIndexIds(int tableId, int catalogVersion) {