You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/11/30 14:48:50 UTC

[ignite-3] branch main updated: IGNITE-15711 Implemented Loza.stop(). Fixes #482

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new dde01f1  IGNITE-15711 Implemented Loza.stop(). Fixes #482
dde01f1 is described below

commit dde01f10ed2fd119be756896919a8666fdb0a774
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Tue Nov 30 17:48:24 2021 +0300

    IGNITE-15711 Implemented Loza.stop(). Fixes #482
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../internal/metastorage/MetaStorageManager.java   |  20 ++--
 .../java/org/apache/ignite/internal/raft/Loza.java | 105 +++++++++++++++-
 .../org/apache/ignite/internal/raft/LozaTest.java  |  85 +++++++++++++
 .../org/apache/ignite/internal/app/IgniteImpl.java |  15 +--
 .../internal/table/distributed/TableManager.java   | 132 ++++++++++++---------
 .../ignite/internal/table/TableManagerTest.java    |  39 ++++--
 6 files changed, 299 insertions(+), 97 deletions(-)

diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 56c3dd4..8c2b448 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -198,13 +198,17 @@ public class MetaStorageManager implements IgniteComponent {
             }
 
             storage.start();
-
-            raftGroupServiceFut = raftMgr.prepareRaftGroup(
-                    METASTORAGE_RAFT_GROUP_NAME,
-                    metaStorageMembers,
-                    () -> new MetaStorageListener(storage)
-            );
-
+    
+            try {
+                raftGroupServiceFut = raftMgr.prepareRaftGroup(
+                        METASTORAGE_RAFT_GROUP_NAME,
+                        metaStorageMembers,
+                        () -> new MetaStorageListener(storage)
+                );
+            } catch (NodeStoppingException e) {
+                throw new AssertionError("Loza was stopped before Meta Storage manager", e);
+            }
+    
             this.metaStorageSvcFut = raftGroupServiceFut.thenApply(service ->
                     new MetaStorageServiceImpl(service, clusterNetSvc.topologyService().localMember().id())
             );
@@ -283,6 +287,8 @@ public class MetaStorageManager implements IgniteComponent {
             LOG.error("Failed to get meta storage raft group service.");
 
             throw new IgniteInternalException(e);
+        } catch (NodeStoppingException e) {
+            throw new AssertionError("Loza was stopped before Meta Storage manager", e);
         }
 
         try {
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index b525088..c73790d 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -25,15 +25,18 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.LoggerMessageHelper;
+import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.raft.client.Peer;
@@ -78,6 +81,12 @@ public class Loza implements IgniteComponent {
     /** Executor for raft group services. */
     private final ScheduledExecutorService executor;
 
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
     /**
      * The constructor.
      *
@@ -123,7 +132,12 @@ public class Loza implements IgniteComponent {
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
-        // TODO: IGNITE-15161 Implement component's stop.
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
         IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
 
         raftServer.stop();
@@ -140,13 +154,35 @@ public class Loza implements IgniteComponent {
      * @param nodes        Raft group nodes.
      * @param lsnrSupplier Raft group listener supplier.
      * @return Future representing pending completion of the operation.
+     * @throws NodeStoppingException If node stopping intention was detected.
      */
     @Experimental
     public CompletableFuture<RaftGroupService> prepareRaftGroup(
             String groupId,
             List<ClusterNode> nodes,
             Supplier<RaftGroupListener> lsnrSupplier
-    ) {
+    ) throws NodeStoppingException {
+        if (!busyLock.enterBusy()) {
+            throw new NodeStoppingException();
+        }
+
+        try {
+            return prepareRaftGroupInternal(groupId, nodes, lsnrSupplier);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Internal method to a raft group creation.
+     *
+     * @param groupId      Raft group id.
+     * @param nodes        Raft group nodes.
+     * @param lsnrSupplier Raft group listener supplier.
+     * @return Future representing pending completion of the operation.
+     */
+    private CompletableFuture<RaftGroupService> prepareRaftGroupInternal(String groupId, List<ClusterNode> nodes,
+            Supplier<RaftGroupListener> lsnrSupplier) {
         assert !nodes.isEmpty();
 
         List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
@@ -187,6 +223,7 @@ public class Loza implements IgniteComponent {
      * @param deltaNodes   New raft group nodes.
      * @param lsnrSupplier Raft group listener supplier.
      * @return Future representing pending completion of the operation.
+     * @throws NodeStoppingException If node stopping intention was detected.
      */
     @Experimental
     public CompletableFuture<RaftGroupService> updateRaftGroup(
@@ -194,7 +231,29 @@ public class Loza implements IgniteComponent {
             Collection<ClusterNode> nodes,
             Collection<ClusterNode> deltaNodes,
             Supplier<RaftGroupListener> lsnrSupplier
-    ) {
+    ) throws NodeStoppingException {
+        if (!busyLock.enterBusy()) {
+            throw new NodeStoppingException();
+        }
+
+        try {
+            return updateRaftGroupInternal(groupId, nodes, deltaNodes, lsnrSupplier);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Internal method for updating a raft group.
+     *
+     * @param groupId      Raft group id.
+     * @param nodes        Full set of raft group nodes.
+     * @param deltaNodes   New raft group nodes.
+     * @param lsnrSupplier Raft group listener supplier.
+     * @return Future representing pending completion of the operation.
+     */
+    private CompletableFuture<RaftGroupService> updateRaftGroupInternal(String groupId, Collection<ClusterNode> nodes,
+            Collection<ClusterNode> deltaNodes, Supplier<RaftGroupListener> lsnrSupplier) {
         assert !nodes.isEmpty();
 
         List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
@@ -230,8 +289,33 @@ public class Loza implements IgniteComponent {
      * @param expectedNodes List of nodes that contains the raft group peers.
      * @param changedNodes  List of nodes that will contain the raft group peers after.
      * @return Future which will complete when peers change.
+     * @throws NodeStoppingException If node stopping intention was detected.
      */
-    public CompletableFuture<Void> changePeers(String groupId, List<ClusterNode> expectedNodes, List<ClusterNode> changedNodes) {
+    public CompletableFuture<Void> changePeers(
+            String groupId,
+            List<ClusterNode> expectedNodes,
+            List<ClusterNode> changedNodes
+    ) throws NodeStoppingException {
+        if (!busyLock.enterBusy()) {
+            throw new NodeStoppingException();
+        }
+
+        try {
+            return changePeersInternal(groupId, expectedNodes, changedNodes);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Internal method for changing peers for a RAFT group.
+     *
+     * @param groupId       Raft group id.
+     * @param expectedNodes List of nodes that contains the raft group peers.
+     * @param changedNodes  List of nodes that will contain the raft group peers after.
+     * @return Future which will complete when peers change.
+     */
+    private CompletableFuture<Void> changePeersInternal(String groupId, List<ClusterNode> expectedNodes, List<ClusterNode> changedNodes) {
         List<Peer> expectedPeers = expectedNodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
         List<Peer> changedPeers = changedNodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
 
@@ -253,9 +337,18 @@ public class Loza implements IgniteComponent {
      * Stops a raft group on the current node.
      *
      * @param groupId Raft group id.
+     * @throws NodeStoppingException If node stopping intention was detected.
      */
-    public void stopRaftGroup(String groupId) {
-        raftServer.stopRaftGroup(groupId);
+    public void stopRaftGroup(String groupId) throws NodeStoppingException {
+        if (!busyLock.enterBusy()) {
+            throw new NodeStoppingException();
+        }
+
+        try {
+            raftServer.stopRaftGroup(groupId);
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
new file mode 100644
index 0000000..ad1dc2f
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * There are tests for RAFT manager.
+ * It is mocking all components except Loza and checks API methods of the component in various conditions.
+ */
+@ExtendWith(MockitoExtension.class)
+public class LozaTest extends IgniteAbstractTest {
+    /** Mock for network service. */
+    @Mock
+    private ClusterService clusterNetSvc;
+
+    /**
+     * Checks that the all API methods throw the exception ({@link org.apache.ignite.lang.NodeStoppingException})
+     * when Loza is closed.
+     *
+     * @throws Exception If fail.
+     */
+    @Test
+    public void testLozaStop() throws Exception {
+        Mockito.doReturn(new ClusterLocalConfiguration("test_node", null)).when(clusterNetSvc).localConfiguration();
+        Mockito.doReturn(Mockito.mock(MessagingService.class)).when(clusterNetSvc).messagingService();
+        Mockito.doReturn(Mockito.mock(TopologyService.class)).when(clusterNetSvc).topologyService();
+
+        Loza loza = new Loza(clusterNetSvc, workDir);
+
+        loza.start();
+
+        loza.beforeNodeStop();
+        loza.stop();
+
+        String raftGroupId = "test_raft_group";
+
+        List<ClusterNode> nodes = List.of(
+                new ClusterNode(UUID.randomUUID().toString(), UUID.randomUUID().toString(), NetworkAddress.from("127.0.0.1:123")));
+
+        List<ClusterNode> newNodes = List.of(
+                new ClusterNode(UUID.randomUUID().toString(), UUID.randomUUID().toString(), NetworkAddress.from("127.0.0.1:124")),
+                new ClusterNode(UUID.randomUUID().toString(), UUID.randomUUID().toString(), NetworkAddress.from("127.0.0.1:125")));
+
+        Supplier<RaftGroupListener> lsnrSupplier = () -> null;
+
+        assertThrows(NodeStoppingException.class, () -> loza.updateRaftGroup(raftGroupId, nodes, newNodes, lsnrSupplier));
+        assertThrows(NodeStoppingException.class, () -> loza.stopRaftGroup(raftGroupId));
+        assertThrows(NodeStoppingException.class, () -> loza.prepareRaftGroup(raftGroupId, nodes, lsnrSupplier));
+        assertThrows(NodeStoppingException.class, () -> loza.changePeers(raftGroupId, nodes, newNodes));
+    }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index bdd66aa..5abb738 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
@@ -331,19 +330,7 @@ public class IgniteImpl implements Ignite {
      * Stops ignite node.
      */
     public void stop() {
-        AtomicBoolean explicitStop = new AtomicBoolean();
-
-        status.getAndUpdate(status -> {
-            if (status == Status.STARTED) {
-                explicitStop.set(true);
-            } else {
-                explicitStop.set(false);
-            }
-
-            return Status.STOPPING;
-        });
-
-        if (explicitStop.get()) {
+        if (status.getAndSet(Status.STOPPING) == Status.STARTED) {
             doStopNode(List.of(vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, txManager, metaStorageMgr, clusterCfgMgr, baselineMgr,
                     distributedTblMgr, qryEngine, restModule, clientHandlerModule, nettyBootstrapFactory));
         }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 98e51d0..50847d9 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -328,20 +328,25 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                             InternalTable internalTable = tablesById.get(tblId).internalTable();
 
                             // Create new raft nodes according to new assignments.
-                            futures[i] = raftMgr.updateRaftGroup(
-                                    raftGroupName(tblId, partId),
-                                    newPartitionAssignment,
-                                    toAdd,
-                                    () -> new PartitionListener(tblId,
-                                            new VersionedRowStore(internalTable.storage().getOrCreatePartition(partId), txManager))
-                            ).thenAccept(
-                                    updatedRaftGroupService -> ((InternalTableImpl) internalTable).updateInternalTableRaftGroupService(
-                                            partId, updatedRaftGroupService)
-                            ).exceptionally(th -> {
-                                LOG.error("Failed to update raft groups one the node", th);
-
-                                return null;
-                            });
+
+                            try {
+                                futures[i] = raftMgr.updateRaftGroup(
+                                        raftGroupName(tblId, partId),
+                                        newPartitionAssignment,
+                                        toAdd,
+                                        () -> new PartitionListener(tblId,
+                                                new VersionedRowStore(internalTable.storage().getOrCreatePartition(partId), txManager))
+                                ).thenAccept(
+                                        updatedRaftGroupService -> ((InternalTableImpl) internalTable).updateInternalTableRaftGroupService(
+                                                partId, updatedRaftGroupService)
+                                ).exceptionally(th -> {
+                                    LOG.error("Failed to update raft groups one the node", th);
+
+                                    return null;
+                                });
+                            } catch (NodeStoppingException e) {
+                                throw new AssertionError("Loza was stopped before Table manager", e);
+                            }
                         }
 
                         return CompletableFuture.allOf(futures);
@@ -493,13 +498,18 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         for (int p = 0; p < partitions; p++) {
             int partId = p;
 
-            partitionsGroupsFutures.add(
-                    raftMgr.prepareRaftGroup(
-                            raftGroupName(tblId, p),
-                            assignment.get(p),
-                            () -> new PartitionListener(tblId, new VersionedRowStore(tableStorage.getOrCreatePartition(partId), txManager))
-                    )
-            );
+            try {
+                partitionsGroupsFutures.add(
+                        raftMgr.prepareRaftGroup(
+                                raftGroupName(tblId, p),
+                                assignment.get(p),
+                                () -> new PartitionListener(tblId,
+                                        new VersionedRowStore(tableStorage.getOrCreatePartition(partId), txManager))
+                        )
+                );
+            } catch (NodeStoppingException e) {
+                throw new AssertionError("Loza was stopped before Table manager", e);
+            }
         }
 
         CompletableFuture.allOf(partitionsGroupsFutures.toArray(CompletableFuture[]::new)).thenRun(() -> {
@@ -706,43 +716,43 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             boolean exceptionWhenExist
     ) {
         CompletableFuture<Table> tblFut = new CompletableFuture<>();
-    
+
         IgniteUuid tblId = TABLE_ID_GENERATOR.randomUuid();
-    
+
         EventListener<TableEventParameters> clo = new EventListener<>() {
             @Override
             public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
                 IgniteUuid notificationTblId = parameters.tableId();
-            
+
                 if (!tblId.equals(notificationTblId)) {
                     return false;
                 }
-            
+
                 if (e == null) {
                     tblFut.complete(parameters.table());
                 } else {
                     tblFut.completeExceptionally(e);
                 }
-            
+
                 return true;
             }
-        
+
             @Override
             public void remove(@NotNull Throwable e) {
                 tblFut.completeExceptionally(e);
             }
         };
-    
+
         listen(TableEvent.CREATE, clo);
-    
+
         tablesCfg.tables().change(change -> {
             if (change.get(name) != null) {
                 throw new TableAlreadyExistsException(name);
             }
-    
+
             change.create(name, (ch) -> {
                         tableInitChange.accept(ch);
-                
+
                         ((ExtendedTableChange) ch)
                                 // Table id specification.
                                 .changeId(tblId.toString())
@@ -756,7 +766,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                         String.valueOf(INITIAL_SCHEMA_VERSION),
                                         schemaCh -> {
                                             SchemaDescriptor schemaDesc;
-                                    
+
                                             //TODO IGNITE-15747 Remove try-catch and force configuration
                                             // validation here to ensure a valid configuration passed to
                                             // prepareSchemaDescriptor() method.
@@ -767,7 +777,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                             } catch (IllegalArgumentException ex) {
                                                 throw new ConfigurationValidationException(ex.getMessage());
                                             }
-                                    
+
                                             schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(schemaDesc));
                                         }
                                 ));
@@ -775,24 +785,24 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             );
         }).exceptionally(t -> {
             Throwable ex = t.getCause();
-        
+
             if (ex instanceof TableAlreadyExistsException) {
                 tableAsync(name, false).thenAccept(table -> {
                     if (!exceptionWhenExist) {
                         tblFut.complete(table);
                     }
-                
+
                     removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(ex));
                 });
             } else {
                 LOG.error(LoggerMessageHelper.format("Table wasn't created [name={}]", name), t);
-            
+
                 removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(ex));
             }
-        
+
             return null;
         });
-    
+
         return tblFut;
     }
 
@@ -877,23 +887,23 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 };
 
                 listen(TableEvent.ALTER, clo);
-    
+
                 tablesCfg.tables().change(ch -> ch.createOrUpdate(name, tblCh -> {
                             tableChange.accept(tblCh);
-                
+
                             ((ExtendedTableChange) tblCh).changeSchemas(schemasCh ->
                                     schemasCh.createOrUpdate(String.valueOf(schemasCh.size() + 1), schemaCh -> {
                                         ExtendedTableView currTableView = (ExtendedTableView) tablesCfg.tables().get(name).value();
-                            
+
                                         SchemaDescriptor descriptor;
-                            
+
                                         //TODO IGNITE-15747 Remove try-catch and force configuration validation
                                         // here to ensure a valid configuration passed to prepareSchemaDescriptor() method.
                                         try {
                                             descriptor = SchemaUtils.prepareSchemaDescriptor(
                                                     ((ExtendedTableView) tblCh).schemas().size(),
                                                     tblCh);
-                                
+
                                             descriptor.columnMapping(SchemaUtils.columnMapper(
                                                     tablesById.get(tblId).schemaView().schema(currTableView.schemas().size()),
                                                     currTableView,
@@ -905,20 +915,20 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                             // when bulk configuration update is applied.
                                             ConfigurationValidationException e =
                                                     new ConfigurationValidationException(ex.getMessage());
-                                
+
                                             e.addSuppressed(ex);
-                                
+
                                             throw e;
                                         }
-                            
+
                                         schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(descriptor));
                                     }));
                         }
                 )).exceptionally(t -> {
                     LOG.error(LoggerMessageHelper.format("Table wasn't altered [name={}]", name), t);
-        
+
                     removeListener(TableEvent.ALTER, clo, new IgniteInternalCheckedException(t.getCause()));
-        
+
                     return null;
                 });
             }
@@ -993,9 +1003,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                         .change(change -> change.delete(name))
                         .exceptionally(t -> {
                             LOG.error(LoggerMessageHelper.format("Table wasn't dropped [name={}]", name), t);
-                            
+
                             removeListener(TableEvent.DROP, clo, new IgniteInternalCheckedException(t.getCause()));
-                            
+
                             return null;
                         });
             }
@@ -1457,15 +1467,19 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             List<ClusterNode> oldPartitionAssignment = oldAssignments.get(p);
             List<ClusterNode> newPartitionAssignment = newAssignments.get(p);
 
-            futures[i] = raftMgr.changePeers(
-                    raftGroupName(tblId, p),
-                    oldPartitionAssignment,
-                    newPartitionAssignment
-            ).exceptionally(th -> {
-                LOG.error("Failed to update raft peers for group " + raftGroupName(tblId, p)
-                        + "from " + oldPartitionAssignment + " to " + newPartitionAssignment, th);
-                return null;
-            });
+            try {
+                futures[i] = raftMgr.changePeers(
+                        raftGroupName(tblId, p),
+                        oldPartitionAssignment,
+                        newPartitionAssignment
+                ).exceptionally(th -> {
+                    LOG.error("Failed to update raft peers for group " + raftGroupName(tblId, p)
+                            + "from " + oldPartitionAssignment + " to " + newPartitionAssignment, th);
+                    return null;
+                });
+            } catch (NodeStoppingException e) {
+                throw new AssertionError("Loza was stopped before Table manager", e);
+            }
         }
 
         return CompletableFuture.allOf(futures);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index 475e438..f790feb 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -190,9 +191,11 @@ public class TableManagerTest extends IgniteAbstractTest {
 
     /**
      * Tests create a table through public API.
+     *
+     * @throws Exception If failed.
      */
     @Test
-    public void testCreateTable() {
+    public void testCreateTable() throws Exception {
         TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_NAME).columns(
                 SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
                 SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
@@ -207,9 +210,11 @@ public class TableManagerTest extends IgniteAbstractTest {
 
     /**
      * Tests drop a table through public API.
+     *
+     * @throws Exception If failed.
      */
     @Test
-    public void testDropTable() {
+    public void testDropTable() throws Exception {
         TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_FOR_DROP_NAME).columns(
                 SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
                 SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
@@ -302,9 +307,11 @@ public class TableManagerTest extends IgniteAbstractTest {
 
     /**
      * Cheks that the all RAFT nodes will be stopped when Table manager is stopping.
+     *
+     * @throws Exception If failed.
      */
     @Test
-    public void tableManagerStopTest() {
+    public void tableManagerStopTest() throws Exception {
         TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_FOR_DROP_NAME).columns(
                 SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
                 SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
@@ -330,12 +337,18 @@ public class TableManagerTest extends IgniteAbstractTest {
                 SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
                 SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
         ).withPrimaryKey("key").build();
-
+    
         Phaser phaser = new Phaser(2);
-
-        CompletableFuture<Table> createFut = CompletableFuture.supplyAsync(() ->
-                mockManagersAndCreateTableWithDelay(scmTbl, tblManagerFut, phaser)
-        );
+    
+        CompletableFuture<Table> createFut = CompletableFuture.supplyAsync(() -> {
+            try {
+                return mockManagersAndCreateTableWithDelay(scmTbl, tblManagerFut, phaser);
+            } catch (NodeStoppingException e) {
+                fail(e.getMessage());
+            }
+            
+            return null;
+        });
 
         CompletableFuture<Table> getFut = CompletableFuture.supplyAsync(() -> {
             phaser.awaitAdvance(0);
@@ -362,9 +375,11 @@ public class TableManagerTest extends IgniteAbstractTest {
 
     /**
      * Tries to create a table that already exists.
+     *
+     * @throws Exception If failed.
      */
     @Test
-    public void testDoubledCreateTable() {
+    public void testDoubledCreateTable() throws Exception {
         TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_NAME)
                 .columns(
                         SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
@@ -393,11 +408,12 @@ public class TableManagerTest extends IgniteAbstractTest {
      * @param tableDefinition Configuration schema for a table.
      * @param tblManagerFut   Future for table manager.
      * @return Table.
+     * @throws NodeStoppingException If something went wrong.
      */
     private TableImpl mockManagersAndCreateTable(
             TableDefinition tableDefinition,
             CompletableFuture<TableManager> tblManagerFut
-    ) {
+    ) throws NodeStoppingException {
         return mockManagersAndCreateTableWithDelay(tableDefinition, tblManagerFut, null);
     }
 
@@ -408,13 +424,14 @@ public class TableManagerTest extends IgniteAbstractTest {
      * @param tblManagerFut   Future for table manager.
      * @param phaser          Phaser for the wait.
      * @return Table manager.
+     * @throws NodeStoppingException If something went wrong.
      */
     @NotNull
     private TableImpl mockManagersAndCreateTableWithDelay(
             TableDefinition tableDefinition,
             CompletableFuture<TableManager> tblManagerFut,
             Phaser phaser
-    ) {
+    ) throws NodeStoppingException {
         when(rm.prepareRaftGroup(any(), any(), any())).thenAnswer(mock -> {
             RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);