You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2023/11/07 15:29:52 UTC

(ignite-3) branch main updated: IGNITE-20796 Refactor replica stop in TableManager (#2807)

This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5537f031dd IGNITE-20796 Refactor replica stop in TableManager (#2807)
5537f031dd is described below

commit 5537f031dd24cca844ffac807a177ee0ca525a9b
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Tue Nov 7 17:29:46 2023 +0200

    IGNITE-20796 Refactor replica stop in TableManager (#2807)
---
 .../internal/table/distributed/TableManager.java   | 191 +++++++++++----------
 1 file changed, 97 insertions(+), 94 deletions(-)

diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 6d2e40f546..4fb2bd45ca 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -853,12 +853,14 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
                 raftGroupService
         );
 
+        CompletableFuture<Void> whenReplicaReady = allOf(
+                ((Loza) raftMgr).raftNodeReadyFuture(replicaGrpId),
+                table.pkIndexesReadyFuture()
+        );
+
         replicaMgr.startReplica(
                 replicaGrpId,
-                allOf(
-                        ((Loza) raftMgr).raftNodeReadyFuture(replicaGrpId),
-                        table.pkIndexesReadyFuture()
-                ),
+                whenReplicaReady,
                 listener,
                 raftGroupService,
                 storageIndexTracker
@@ -1012,17 +1014,17 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
 
                 InternalTable internalTable = table.internalTable();
 
-                for (int p = 0; p < internalTable.partitions(); p++) {
-                    TablePartitionId replicationGroupId = new TablePartitionId(table.tableId(), p);
-
-                    stopping.add(() -> closePartitionTrackers(internalTable, replicationGroupId.partitionId()));
+                stopping.add(() -> {
+                    var stopReplicaFutures = new CompletableFuture<?>[internalTable.partitions()];
 
-                    stopping.add(() -> replicaMgr.stopReplica(replicationGroupId).get(10, TimeUnit.SECONDS));
+                    for (int p = 0; p < internalTable.partitions(); p++) {
+                        TablePartitionId replicationGroupId = new TablePartitionId(table.tableId(), p);
 
-                    stopping.add(() -> raftMgr.stopRaftNodes(replicationGroupId));
+                        stopReplicaFutures[p] = stopPartition(replicationGroupId, table);
+                    }
 
-                    stopping.add(() -> mvGc.removeStorage(replicationGroupId).get(10, TimeUnit.SECONDS));
-                }
+                    allOf(stopReplicaFutures).get(10, TimeUnit.SECONDS);
+                });
 
                 stopping.add(internalTable.storage());
                 stopping.add(internalTable.txStateStorage());
@@ -1273,71 +1275,58 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
         int tableId = tableDescriptor.id();
         int partitions = zoneDescriptor.partitions();
 
-        try {
-            CompletableFuture<?>[] removeStorageFromGcFutures = new CompletableFuture<?>[partitions];
-
-            for (int p = 0; p < partitions; p++) {
-                TablePartitionId replicationGroupId = new TablePartitionId(tableId, p);
-
-                raftMgr.stopRaftNodes(replicationGroupId);
-
-                removeStorageFromGcFutures[p] = replicaMgr
-                        .stopReplica(replicationGroupId)
-                        .thenCompose((notUsed) -> mvGc.removeStorage(replicationGroupId));
+        localPartsByTableIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> {
+            if (e != null) {
+                return failedFuture(e);
             }
 
-            localPartsByTableIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> {
-                if (e != null) {
-                    return failedFuture(e);
-                }
+            var newMap = new HashMap<>(previousVal);
+            newMap.remove(tableId);
 
-                var newMap = new HashMap<>(previousVal);
-                newMap.remove(tableId);
+            return completedFuture(newMap);
+        }));
 
-                return completedFuture(newMap);
-            }));
+        tablesByIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> {
+            if (e != null) {
+                return failedFuture(e);
+            }
 
-            tablesByIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> {
-                if (e != null) {
-                    return failedFuture(e);
-                }
+            var map = new HashMap<>(previousVal);
 
-                var map = new HashMap<>(previousVal);
+            TableImpl table = map.remove(tableId);
 
-                TableImpl table = map.remove(tableId);
+            assert table != null : tableId;
 
-                assert table != null : tableId;
+            InternalTable internalTable = table.internalTable();
 
-                InternalTable internalTable = table.internalTable();
+            CompletableFuture<?>[] stopReplicaFutures = new CompletableFuture<?>[partitions];
 
-                for (int partitionId = 0; partitionId < partitions; partitionId++) {
-                    closePartitionTrackers(internalTable, partitionId);
-                }
+            for (int partitionId = 0; partitionId < partitions; partitionId++) {
+                var replicationGroupId = new TablePartitionId(tableId, partitionId);
 
-                // TODO: IGNITE-18703 Destroy raft log and meta
+                stopReplicaFutures[partitionId] = stopPartition(replicationGroupId, table);
+            }
 
-                CompletableFuture<Void> destroyTableStoragesFuture = allOf(removeStorageFromGcFutures)
-                        .thenCompose(unused -> allOf(
-                                internalTable.storage().destroy(),
-                                runAsync(() -> internalTable.txStateStorage().destroy(), ioExecutor))
-                        );
+            // TODO: IGNITE-18703 Destroy raft log and meta
+            CompletableFuture<Void> destroyTableStoragesFuture = allOf(stopReplicaFutures)
+                    .thenCompose(unused -> allOf(
+                            internalTable.storage().destroy(),
+                            runAsync(() -> internalTable.txStateStorage().destroy(), ioExecutor))
+                    );
 
-                CompletableFuture<?> dropSchemaRegistryFuture = schemaManager.dropRegistry(causalityToken, table.tableId());
+            CompletableFuture<?> dropSchemaRegistryFuture = schemaManager.dropRegistry(causalityToken, table.tableId());
 
-                return allOf(destroyTableStoragesFuture, dropSchemaRegistryFuture)
-                        .thenApply(v -> map);
-            }));
+            return allOf(destroyTableStoragesFuture, dropSchemaRegistryFuture)
+                    .thenApply(v -> map);
+        }));
 
-            startedTables.remove(tableId);
+        startedTables.remove(tableId);
 
-            Set<ByteArray> assignmentKeys = IntStream.range(0, partitions)
-                    .mapToObj(p -> stablePartAssignmentsKey(new TablePartitionId(tableId, p)))
-                    .collect(Collectors.toSet());
+        Set<ByteArray> assignmentKeys = IntStream.range(0, partitions)
+                .mapToObj(p -> stablePartAssignmentsKey(new TablePartitionId(tableId, p)))
+                .collect(Collectors.toSet());
 
-            metaStorageMgr.removeAll(assignmentKeys);
-        } catch (NodeStoppingException e) {
-            // No op.
-        }
+        metaStorageMgr.removeAll(assignmentKeys);
     }
 
     private CompletableFuture<Set<Assignment>> calculateAssignments(TablePartitionId tablePartitionId) {
@@ -2078,51 +2067,65 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
                 }, ioExecutor);
     }
 
-    private CompletableFuture<Void> stopAndDestroyPartition(TablePartitionId tablePartitionId, long revision) {
-        try {
-            raftMgr.stopRaftNodes(tablePartitionId);
-        } catch (NodeStoppingException e) {
-            // No-op
+    private CompletableFuture<Void> stopAndDestroyPartition(TablePartitionId tablePartitionId, long causalityToken) {
+        return tablesById(causalityToken)
+                .thenCompose(tables -> {
+                    TableImpl table = tables.get(tablePartitionId.tableId());
+
+                    return stopPartition(tablePartitionId, table)
+                            .thenCompose(v -> destroyPartitionStorages(tablePartitionId, table));
+                });
+    }
+
+    /**
+     * Stops all resources associated with a given partition, like replicas and partition trackers.
+     *
+     * @param tablePartitionId Partition ID.
+     * @param table Table which this partition belongs to.
+     * @return Future that will be completed after all resources have been closed.
+     */
+    private CompletableFuture<Void> stopPartition(TablePartitionId tablePartitionId, TableImpl table) {
+        // TODO: IGNITE-19905 - remove the check.
+        if (table != null) {
+            closePartitionTrackers(table.internalTable(), tablePartitionId.partitionId());
         }
 
-        CompletableFuture<Boolean> stopReplicaFut;
+        CompletableFuture<Boolean> stopReplicaFuture;
+
         try {
-            stopReplicaFut = replicaMgr.stopReplica(tablePartitionId);
+            stopReplicaFuture = replicaMgr.stopReplica(tablePartitionId);
         } catch (NodeStoppingException e) {
-            stopReplicaFut = completedFuture(true);
+            // No-op.
+            stopReplicaFuture = completedFuture(false);
         }
 
-        return destroyPartitionStorages(tablePartitionId, revision, stopReplicaFut);
-    }
-
-    private CompletableFuture<Void> destroyPartitionStorages(
-            TablePartitionId tablePartitionId,
-            long revision,
-            CompletableFuture<Boolean> stopReplicaFut
-    ) {
-        int partitionId = tablePartitionId.partitionId();
+        return stopReplicaFuture
+                .thenCompose(v -> {
+                    try {
+                        raftMgr.stopRaftNodes(tablePartitionId);
+                    } catch (NodeStoppingException ignored) {
+                        // No-op.
+                    }
 
-        return tablesById(revision)
-                // TODO: IGNITE-18703 Destroy raft log and meta
-                .thenCombine(mvGc.removeStorage(tablePartitionId), (tables, unused) -> {
-                    TableImpl table = tables.get(tablePartitionId.tableId());
+                    return mvGc.removeStorage(tablePartitionId);
+                });
+    }
 
-                    // TODO: IGNITE-19905 - remove the check.
-                    if (table == null) {
-                        return allOf(stopReplicaFut);
-                    }
+    private CompletableFuture<Void> destroyPartitionStorages(TablePartitionId tablePartitionId, TableImpl table) {
+        // TODO: IGNITE-18703 Destroy raft log and meta
+        // TODO: IGNITE-19905 - remove the check.
+        if (table == null) {
+            return completedFuture(null);
+        }
 
-                    InternalTable internalTable = table.internalTable();
+        InternalTable internalTable = table.internalTable();
 
-                    closePartitionTrackers(internalTable, partitionId);
+        int partitionId = tablePartitionId.partitionId();
 
-                    return allOf(
-                            stopReplicaFut,
-                            internalTable.storage().destroyPartition(partitionId),
-                            runAsync(() -> internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
-                    );
-                })
-                .thenCompose(Function.identity());
+        return allOf(
+                internalTable.storage().destroyPartition(partitionId),
+                runAsync(() -> internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
+        );
     }
 
     private int[] collectTableIndexIds(int tableId, int catalogVersion) {