You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/11/29 19:23:47 UTC
[ignite-3] branch main updated: IGNITE-15711 Implemented Loza.stop(). Fixes #446
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 98b08b5 IGNITE-15711 Implemented Loza.stop(). Fixes #446
98b08b5 is described below
commit 98b08b5d0d9f7055a3c5d77b320d5d90ccd97d4c
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Mon Nov 29 22:23:26 2021 +0300
IGNITE-15711 Implemented Loza.stop(). Fixes #446
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../internal/metastorage/MetaStorageManager.java | 20 ++--
.../java/org/apache/ignite/internal/raft/Loza.java | 105 ++++++++++++++++-
.../org/apache/ignite/internal/raft/LozaTest.java | 85 ++++++++++++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 19 +--
.../internal/table/distributed/TableManager.java | 130 +++++++++++----------
.../ignite/internal/table/TableManagerTest.java | 39 +++++--
6 files changed, 299 insertions(+), 99 deletions(-)
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 56c3dd4..8c2b448 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -198,13 +198,17 @@ public class MetaStorageManager implements IgniteComponent {
}
storage.start();
-
- raftGroupServiceFut = raftMgr.prepareRaftGroup(
- METASTORAGE_RAFT_GROUP_NAME,
- metaStorageMembers,
- () -> new MetaStorageListener(storage)
- );
-
+
+ try {
+ raftGroupServiceFut = raftMgr.prepareRaftGroup(
+ METASTORAGE_RAFT_GROUP_NAME,
+ metaStorageMembers,
+ () -> new MetaStorageListener(storage)
+ );
+ } catch (NodeStoppingException e) {
+ throw new AssertionError("Loza was stopped before Meta Storage manager", e);
+ }
+
this.metaStorageSvcFut = raftGroupServiceFut.thenApply(service ->
new MetaStorageServiceImpl(service, clusterNetSvc.topologyService().localMember().id())
);
@@ -283,6 +287,8 @@ public class MetaStorageManager implements IgniteComponent {
LOG.error("Failed to get meta storage raft group service.");
throw new IgniteInternalException(e);
+ } catch (NodeStoppingException e) {
+ throw new AssertionError("Loza was stopped before Meta Storage manager", e);
}
try {
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index b525088..c73790d 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -25,15 +25,18 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.LoggerMessageHelper;
+import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.client.Peer;
@@ -78,6 +81,12 @@ public class Loza implements IgniteComponent {
/** Executor for raft group services. */
private final ScheduledExecutorService executor;
+ /** Busy lock to stop synchronously. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** Prevents double stopping the component. */
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
/**
* The constructor.
*
@@ -123,7 +132,12 @@ public class Loza implements IgniteComponent {
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
- // TODO: IGNITE-15161 Implement component's stop.
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
raftServer.stop();
@@ -140,13 +154,35 @@ public class Loza implements IgniteComponent {
* @param nodes Raft group nodes.
* @param lsnrSupplier Raft group listener supplier.
* @return Future representing pending completion of the operation.
+ * @throws NodeStoppingException If node stopping intention was detected.
*/
@Experimental
public CompletableFuture<RaftGroupService> prepareRaftGroup(
String groupId,
List<ClusterNode> nodes,
Supplier<RaftGroupListener> lsnrSupplier
- ) {
+ ) throws NodeStoppingException {
+ if (!busyLock.enterBusy()) {
+ throw new NodeStoppingException();
+ }
+
+ try {
+ return prepareRaftGroupInternal(groupId, nodes, lsnrSupplier);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Internal method to a raft group creation.
+ *
+ * @param groupId Raft group id.
+ * @param nodes Raft group nodes.
+ * @param lsnrSupplier Raft group listener supplier.
+ * @return Future representing pending completion of the operation.
+ */
+ private CompletableFuture<RaftGroupService> prepareRaftGroupInternal(String groupId, List<ClusterNode> nodes,
+ Supplier<RaftGroupListener> lsnrSupplier) {
assert !nodes.isEmpty();
List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
@@ -187,6 +223,7 @@ public class Loza implements IgniteComponent {
* @param deltaNodes New raft group nodes.
* @param lsnrSupplier Raft group listener supplier.
* @return Future representing pending completion of the operation.
+ * @throws NodeStoppingException If node stopping intention was detected.
*/
@Experimental
public CompletableFuture<RaftGroupService> updateRaftGroup(
@@ -194,7 +231,29 @@ public class Loza implements IgniteComponent {
Collection<ClusterNode> nodes,
Collection<ClusterNode> deltaNodes,
Supplier<RaftGroupListener> lsnrSupplier
- ) {
+ ) throws NodeStoppingException {
+ if (!busyLock.enterBusy()) {
+ throw new NodeStoppingException();
+ }
+
+ try {
+ return updateRaftGroupInternal(groupId, nodes, deltaNodes, lsnrSupplier);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Internal method for updating a raft group.
+ *
+ * @param groupId Raft group id.
+ * @param nodes Full set of raft group nodes.
+ * @param deltaNodes New raft group nodes.
+ * @param lsnrSupplier Raft group listener supplier.
+ * @return Future representing pending completion of the operation.
+ */
+ private CompletableFuture<RaftGroupService> updateRaftGroupInternal(String groupId, Collection<ClusterNode> nodes,
+ Collection<ClusterNode> deltaNodes, Supplier<RaftGroupListener> lsnrSupplier) {
assert !nodes.isEmpty();
List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
@@ -230,8 +289,33 @@ public class Loza implements IgniteComponent {
* @param expectedNodes List of nodes that contains the raft group peers.
* @param changedNodes List of nodes that will contain the raft group peers after.
* @return Future which will complete when peers change.
+ * @throws NodeStoppingException If node stopping intention was detected.
*/
- public CompletableFuture<Void> changePeers(String groupId, List<ClusterNode> expectedNodes, List<ClusterNode> changedNodes) {
+ public CompletableFuture<Void> changePeers(
+ String groupId,
+ List<ClusterNode> expectedNodes,
+ List<ClusterNode> changedNodes
+ ) throws NodeStoppingException {
+ if (!busyLock.enterBusy()) {
+ throw new NodeStoppingException();
+ }
+
+ try {
+ return changePeersInternal(groupId, expectedNodes, changedNodes);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Internal method for changing peers for a RAFT group.
+ *
+ * @param groupId Raft group id.
+ * @param expectedNodes List of nodes that contains the raft group peers.
+ * @param changedNodes List of nodes that will contain the raft group peers after.
+ * @return Future which will complete when peers change.
+ */
+ private CompletableFuture<Void> changePeersInternal(String groupId, List<ClusterNode> expectedNodes, List<ClusterNode> changedNodes) {
List<Peer> expectedPeers = expectedNodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
List<Peer> changedPeers = changedNodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
@@ -253,9 +337,18 @@ public class Loza implements IgniteComponent {
* Stops a raft group on the current node.
*
* @param groupId Raft group id.
+ * @throws NodeStoppingException If node stopping intention was detected.
*/
- public void stopRaftGroup(String groupId) {
- raftServer.stopRaftGroup(groupId);
+ public void stopRaftGroup(String groupId) throws NodeStoppingException {
+ if (!busyLock.enterBusy()) {
+ throw new NodeStoppingException();
+ }
+
+ try {
+ raftServer.stopRaftGroup(groupId);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
new file mode 100644
index 0000000..ad1dc2f
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * There are tests for RAFT manager.
+ * It is mocking all components except Loza and checks API methods of the component in various conditions.
+ */
+@ExtendWith(MockitoExtension.class)
+public class LozaTest extends IgniteAbstractTest {
+ /** Mock for network service. */
+ @Mock
+ private ClusterService clusterNetSvc;
+
+ /**
+ * Checks that the all API methods throw the exception ({@link org.apache.ignite.lang.NodeStoppingException})
+ * when Loza is closed.
+ *
+ * @throws Exception If fail.
+ */
+ @Test
+ public void testLozaStop() throws Exception {
+ Mockito.doReturn(new ClusterLocalConfiguration("test_node", null)).when(clusterNetSvc).localConfiguration();
+ Mockito.doReturn(Mockito.mock(MessagingService.class)).when(clusterNetSvc).messagingService();
+ Mockito.doReturn(Mockito.mock(TopologyService.class)).when(clusterNetSvc).topologyService();
+
+ Loza loza = new Loza(clusterNetSvc, workDir);
+
+ loza.start();
+
+ loza.beforeNodeStop();
+ loza.stop();
+
+ String raftGroupId = "test_raft_group";
+
+ List<ClusterNode> nodes = List.of(
+ new ClusterNode(UUID.randomUUID().toString(), UUID.randomUUID().toString(), NetworkAddress.from("127.0.0.1:123")));
+
+ List<ClusterNode> newNodes = List.of(
+ new ClusterNode(UUID.randomUUID().toString(), UUID.randomUUID().toString(), NetworkAddress.from("127.0.0.1:124")),
+ new ClusterNode(UUID.randomUUID().toString(), UUID.randomUUID().toString(), NetworkAddress.from("127.0.0.1:125")));
+
+ Supplier<RaftGroupListener> lsnrSupplier = () -> null;
+
+ assertThrows(NodeStoppingException.class, () -> loza.updateRaftGroup(raftGroupId, nodes, newNodes, lsnrSupplier));
+ assertThrows(NodeStoppingException.class, () -> loza.stopRaftGroup(raftGroupId));
+ assertThrows(NodeStoppingException.class, () -> loza.prepareRaftGroup(raftGroupId, nodes, lsnrSupplier));
+ assertThrows(NodeStoppingException.class, () -> loza.changePeers(raftGroupId, nodes, newNodes));
+ }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index bdd66aa..99ef3a4 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
@@ -331,21 +330,9 @@ public class IgniteImpl implements Ignite {
* Stops ignite node.
*/
public void stop() {
- AtomicBoolean explicitStop = new AtomicBoolean();
-
- status.getAndUpdate(status -> {
- if (status == Status.STARTED) {
- explicitStop.set(true);
- } else {
- explicitStop.set(false);
- }
-
- return Status.STOPPING;
- });
-
- if (explicitStop.get()) {
- doStopNode(List.of(vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, txManager, metaStorageMgr, clusterCfgMgr, baselineMgr,
- distributedTblMgr, qryEngine, restModule, clientHandlerModule, nettyBootstrapFactory));
+ if (status.getAndSet(Status.STOPPING) == Status.STARTED) {
+ doStopNode(List.of(vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, metaStorageMgr, clusterCfgMgr, baselineMgr,
+ distributedTblMgr, qryEngine, restModule, clientHandlerModule));
}
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 98e51d0..8f27d1a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -328,20 +328,24 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
InternalTable internalTable = tablesById.get(tblId).internalTable();
// Create new raft nodes according to new assignments.
- futures[i] = raftMgr.updateRaftGroup(
- raftGroupName(tblId, partId),
- newPartitionAssignment,
- toAdd,
- () -> new PartitionListener(tblId,
- new VersionedRowStore(internalTable.storage().getOrCreatePartition(partId), txManager))
- ).thenAccept(
- updatedRaftGroupService -> ((InternalTableImpl) internalTable).updateInternalTableRaftGroupService(
- partId, updatedRaftGroupService)
- ).exceptionally(th -> {
- LOG.error("Failed to update raft groups one the node", th);
-
- return null;
- });
+ try {
+ futures[i] = raftMgr.updateRaftGroup(
+ raftGroupName(tblId, partId),
+ newPartitionAssignment,
+ toAdd,
+ () -> new PartitionListener(tblId,
+ new VersionedRowStore(internalTable.storage().getOrCreatePartition(partId), txManager))
+ ).thenAccept(
+ updatedRaftGroupService -> ((InternalTableImpl) internalTable).updateInternalTableRaftGroupService(
+ partId, updatedRaftGroupService)
+ ).exceptionally(th -> {
+ LOG.error("Failed to update raft groups one the node", th);
+
+ return null;
+ });
+ } catch (NodeStoppingException e) {
+ throw new AssertionError("Loza was stopped before Table manager.", e);
+ }
}
return CompletableFuture.allOf(futures);
@@ -493,13 +497,17 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
for (int p = 0; p < partitions; p++) {
int partId = p;
- partitionsGroupsFutures.add(
- raftMgr.prepareRaftGroup(
- raftGroupName(tblId, p),
- assignment.get(p),
- () -> new PartitionListener(tblId, new VersionedRowStore(tableStorage.getOrCreatePartition(partId), txManager))
- )
- );
+ try {
+ partitionsGroupsFutures.add(
+ raftMgr.prepareRaftGroup(
+ raftGroupName(tblId, p),
+ assignment.get(p),
+ () -> new PartitionListener(tblId, new VersionedRowStore(tableStorage.getOrCreatePartition(partId), txManager))
+ )
+ );
+ } catch (NodeStoppingException e) {
+ throw new AssertionError("Loza was stopped before Table manager", e);
+ }
}
CompletableFuture.allOf(partitionsGroupsFutures.toArray(CompletableFuture[]::new)).thenRun(() -> {
@@ -706,43 +714,43 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
boolean exceptionWhenExist
) {
CompletableFuture<Table> tblFut = new CompletableFuture<>();
-
+
IgniteUuid tblId = TABLE_ID_GENERATOR.randomUuid();
-
+
EventListener<TableEventParameters> clo = new EventListener<>() {
@Override
public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
IgniteUuid notificationTblId = parameters.tableId();
-
+
if (!tblId.equals(notificationTblId)) {
return false;
}
-
+
if (e == null) {
tblFut.complete(parameters.table());
} else {
tblFut.completeExceptionally(e);
}
-
+
return true;
}
-
+
@Override
public void remove(@NotNull Throwable e) {
tblFut.completeExceptionally(e);
}
};
-
+
listen(TableEvent.CREATE, clo);
-
+
tablesCfg.tables().change(change -> {
if (change.get(name) != null) {
throw new TableAlreadyExistsException(name);
}
-
+
change.create(name, (ch) -> {
tableInitChange.accept(ch);
-
+
((ExtendedTableChange) ch)
// Table id specification.
.changeId(tblId.toString())
@@ -756,7 +764,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
String.valueOf(INITIAL_SCHEMA_VERSION),
schemaCh -> {
SchemaDescriptor schemaDesc;
-
+
//TODO IGNITE-15747 Remove try-catch and force configuration
// validation here to ensure a valid configuration passed to
// prepareSchemaDescriptor() method.
@@ -767,7 +775,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
} catch (IllegalArgumentException ex) {
throw new ConfigurationValidationException(ex.getMessage());
}
-
+
schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(schemaDesc));
}
));
@@ -775,24 +783,24 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
);
}).exceptionally(t -> {
Throwable ex = t.getCause();
-
+
if (ex instanceof TableAlreadyExistsException) {
tableAsync(name, false).thenAccept(table -> {
if (!exceptionWhenExist) {
tblFut.complete(table);
}
-
+
removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(ex));
});
} else {
LOG.error(LoggerMessageHelper.format("Table wasn't created [name={}]", name), t);
-
+
removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(ex));
}
-
+
return null;
});
-
+
return tblFut;
}
@@ -877,23 +885,23 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
};
listen(TableEvent.ALTER, clo);
-
+
tablesCfg.tables().change(ch -> ch.createOrUpdate(name, tblCh -> {
tableChange.accept(tblCh);
-
+
((ExtendedTableChange) tblCh).changeSchemas(schemasCh ->
schemasCh.createOrUpdate(String.valueOf(schemasCh.size() + 1), schemaCh -> {
ExtendedTableView currTableView = (ExtendedTableView) tablesCfg.tables().get(name).value();
-
+
SchemaDescriptor descriptor;
-
+
//TODO IGNITE-15747 Remove try-catch and force configuration validation
// here to ensure a valid configuration passed to prepareSchemaDescriptor() method.
try {
descriptor = SchemaUtils.prepareSchemaDescriptor(
((ExtendedTableView) tblCh).schemas().size(),
tblCh);
-
+
descriptor.columnMapping(SchemaUtils.columnMapper(
tablesById.get(tblId).schemaView().schema(currTableView.schemas().size()),
currTableView,
@@ -905,20 +913,20 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
// when bulk configuration update is applied.
ConfigurationValidationException e =
new ConfigurationValidationException(ex.getMessage());
-
+
e.addSuppressed(ex);
-
+
throw e;
}
-
+
schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(descriptor));
}));
}
)).exceptionally(t -> {
LOG.error(LoggerMessageHelper.format("Table wasn't altered [name={}]", name), t);
-
+
removeListener(TableEvent.ALTER, clo, new IgniteInternalCheckedException(t.getCause()));
-
+
return null;
});
}
@@ -993,9 +1001,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
.change(change -> change.delete(name))
.exceptionally(t -> {
LOG.error(LoggerMessageHelper.format("Table wasn't dropped [name={}]", name), t);
-
+
removeListener(TableEvent.DROP, clo, new IgniteInternalCheckedException(t.getCause()));
-
+
return null;
});
}
@@ -1457,15 +1465,19 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
List<ClusterNode> oldPartitionAssignment = oldAssignments.get(p);
List<ClusterNode> newPartitionAssignment = newAssignments.get(p);
- futures[i] = raftMgr.changePeers(
- raftGroupName(tblId, p),
- oldPartitionAssignment,
- newPartitionAssignment
- ).exceptionally(th -> {
- LOG.error("Failed to update raft peers for group " + raftGroupName(tblId, p)
- + "from " + oldPartitionAssignment + " to " + newPartitionAssignment, th);
- return null;
- });
+ try {
+ futures[i] = raftMgr.changePeers(
+ raftGroupName(tblId, p),
+ oldPartitionAssignment,
+ newPartitionAssignment
+ ).exceptionally(th -> {
+ LOG.error("Failed to update raft peers for group " + raftGroupName(tblId, p)
+ + "from " + oldPartitionAssignment + " to " + newPartitionAssignment, th);
+ return null;
+ });
+ } catch (NodeStoppingException e) {
+ throw new AssertionError("Loza was stopped before Table manager", e);
+ }
}
return CompletableFuture.allOf(futures);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index 475e438..f790feb 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
@@ -190,9 +191,11 @@ public class TableManagerTest extends IgniteAbstractTest {
/**
* Tests create a table through public API.
+ *
+ * @throws Exception If failed.
*/
@Test
- public void testCreateTable() {
+ public void testCreateTable() throws Exception {
TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_NAME).columns(
SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
@@ -207,9 +210,11 @@ public class TableManagerTest extends IgniteAbstractTest {
/**
* Tests drop a table through public API.
+ *
+ * @throws Exception If failed.
*/
@Test
- public void testDropTable() {
+ public void testDropTable() throws Exception {
TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_FOR_DROP_NAME).columns(
SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
@@ -302,9 +307,11 @@ public class TableManagerTest extends IgniteAbstractTest {
/**
* Cheks that the all RAFT nodes will be stopped when Table manager is stopping.
+ *
+ * @throws Exception If failed.
*/
@Test
- public void tableManagerStopTest() {
+ public void tableManagerStopTest() throws Exception {
TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_FOR_DROP_NAME).columns(
SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
@@ -330,12 +337,18 @@ public class TableManagerTest extends IgniteAbstractTest {
SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
).withPrimaryKey("key").build();
-
+
Phaser phaser = new Phaser(2);
-
- CompletableFuture<Table> createFut = CompletableFuture.supplyAsync(() ->
- mockManagersAndCreateTableWithDelay(scmTbl, tblManagerFut, phaser)
- );
+
+ CompletableFuture<Table> createFut = CompletableFuture.supplyAsync(() -> {
+ try {
+ return mockManagersAndCreateTableWithDelay(scmTbl, tblManagerFut, phaser);
+ } catch (NodeStoppingException e) {
+ fail(e.getMessage());
+ }
+
+ return null;
+ });
CompletableFuture<Table> getFut = CompletableFuture.supplyAsync(() -> {
phaser.awaitAdvance(0);
@@ -362,9 +375,11 @@ public class TableManagerTest extends IgniteAbstractTest {
/**
* Tries to create a table that already exists.
+ *
+ * @throws Exception If failed.
*/
@Test
- public void testDoubledCreateTable() {
+ public void testDoubledCreateTable() throws Exception {
TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_NAME)
.columns(
SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
@@ -393,11 +408,12 @@ public class TableManagerTest extends IgniteAbstractTest {
* @param tableDefinition Configuration schema for a table.
* @param tblManagerFut Future for table manager.
* @return Table.
+ * @throws NodeStoppingException If something went wrong.
*/
private TableImpl mockManagersAndCreateTable(
TableDefinition tableDefinition,
CompletableFuture<TableManager> tblManagerFut
- ) {
+ ) throws NodeStoppingException {
return mockManagersAndCreateTableWithDelay(tableDefinition, tblManagerFut, null);
}
@@ -408,13 +424,14 @@ public class TableManagerTest extends IgniteAbstractTest {
* @param tblManagerFut Future for table manager.
* @param phaser Phaser for the wait.
* @return Table manager.
+ * @throws NodeStoppingException If something went wrong.
*/
@NotNull
private TableImpl mockManagersAndCreateTableWithDelay(
TableDefinition tableDefinition,
CompletableFuture<TableManager> tblManagerFut,
Phaser phaser
- ) {
+ ) throws NodeStoppingException {
when(rm.prepareRaftGroup(any(), any(), any())).thenAnswer(mock -> {
RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);