You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/11/30 11:43:09 UTC
[ignite-3] branch main updated: Revert "IGNITE-15711 Implemented Loza.stop(). Fixes #446"
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 24b18c9 Revert "IGNITE-15711 Implemented Loza.stop(). Fixes #446"
24b18c9 is described below
commit 24b18c97637b3e79f8efc49bc69ad0507676a870
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Tue Nov 30 14:42:27 2021 +0300
Revert "IGNITE-15711 Implemented Loza.stop(). Fixes #446"
This reverts commit 98b08b5d
---
.../internal/metastorage/MetaStorageManager.java | 20 ++--
.../java/org/apache/ignite/internal/raft/Loza.java | 105 +----------------
.../org/apache/ignite/internal/raft/LozaTest.java | 85 --------------
.../org/apache/ignite/internal/app/IgniteImpl.java | 19 ++-
.../internal/table/distributed/TableManager.java | 130 ++++++++++-----------
.../ignite/internal/table/TableManagerTest.java | 39 ++-----
6 files changed, 99 insertions(+), 299 deletions(-)
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 8c2b448..56c3dd4 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -198,17 +198,13 @@ public class MetaStorageManager implements IgniteComponent {
}
storage.start();
-
- try {
- raftGroupServiceFut = raftMgr.prepareRaftGroup(
- METASTORAGE_RAFT_GROUP_NAME,
- metaStorageMembers,
- () -> new MetaStorageListener(storage)
- );
- } catch (NodeStoppingException e) {
- throw new AssertionError("Loza was stopped before Meta Storage manager", e);
- }
-
+
+ raftGroupServiceFut = raftMgr.prepareRaftGroup(
+ METASTORAGE_RAFT_GROUP_NAME,
+ metaStorageMembers,
+ () -> new MetaStorageListener(storage)
+ );
+
this.metaStorageSvcFut = raftGroupServiceFut.thenApply(service ->
new MetaStorageServiceImpl(service, clusterNetSvc.topologyService().localMember().id())
);
@@ -287,8 +283,6 @@ public class MetaStorageManager implements IgniteComponent {
LOG.error("Failed to get meta storage raft group service.");
throw new IgniteInternalException(e);
- } catch (NodeStoppingException e) {
- throw new AssertionError("Loza was stopped before Meta Storage manager", e);
}
try {
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index c73790d..b525088 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -25,18 +25,15 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.LoggerMessageHelper;
-import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.client.Peer;
@@ -81,12 +78,6 @@ public class Loza implements IgniteComponent {
/** Executor for raft group services. */
private final ScheduledExecutorService executor;
- /** Busy lock to stop synchronously. */
- private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
-
- /** Prevents double stopping the component. */
- private final AtomicBoolean stopGuard = new AtomicBoolean();
-
/**
* The constructor.
*
@@ -132,12 +123,7 @@ public class Loza implements IgniteComponent {
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
- if (!stopGuard.compareAndSet(false, true)) {
- return;
- }
-
- busyLock.block();
-
+ // TODO: IGNITE-15161 Implement component's stop.
IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
raftServer.stop();
@@ -154,35 +140,13 @@ public class Loza implements IgniteComponent {
* @param nodes Raft group nodes.
* @param lsnrSupplier Raft group listener supplier.
* @return Future representing pending completion of the operation.
- * @throws NodeStoppingException If node stopping intention was detected.
*/
@Experimental
public CompletableFuture<RaftGroupService> prepareRaftGroup(
String groupId,
List<ClusterNode> nodes,
Supplier<RaftGroupListener> lsnrSupplier
- ) throws NodeStoppingException {
- if (!busyLock.enterBusy()) {
- throw new NodeStoppingException();
- }
-
- try {
- return prepareRaftGroupInternal(groupId, nodes, lsnrSupplier);
- } finally {
- busyLock.leaveBusy();
- }
- }
-
- /**
- * Internal method to a raft group creation.
- *
- * @param groupId Raft group id.
- * @param nodes Raft group nodes.
- * @param lsnrSupplier Raft group listener supplier.
- * @return Future representing pending completion of the operation.
- */
- private CompletableFuture<RaftGroupService> prepareRaftGroupInternal(String groupId, List<ClusterNode> nodes,
- Supplier<RaftGroupListener> lsnrSupplier) {
+ ) {
assert !nodes.isEmpty();
List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
@@ -223,7 +187,6 @@ public class Loza implements IgniteComponent {
* @param deltaNodes New raft group nodes.
* @param lsnrSupplier Raft group listener supplier.
* @return Future representing pending completion of the operation.
- * @throws NodeStoppingException If node stopping intention was detected.
*/
@Experimental
public CompletableFuture<RaftGroupService> updateRaftGroup(
@@ -231,29 +194,7 @@ public class Loza implements IgniteComponent {
Collection<ClusterNode> nodes,
Collection<ClusterNode> deltaNodes,
Supplier<RaftGroupListener> lsnrSupplier
- ) throws NodeStoppingException {
- if (!busyLock.enterBusy()) {
- throw new NodeStoppingException();
- }
-
- try {
- return updateRaftGroupInternal(groupId, nodes, deltaNodes, lsnrSupplier);
- } finally {
- busyLock.leaveBusy();
- }
- }
-
- /**
- * Internal method for updating a raft group.
- *
- * @param groupId Raft group id.
- * @param nodes Full set of raft group nodes.
- * @param deltaNodes New raft group nodes.
- * @param lsnrSupplier Raft group listener supplier.
- * @return Future representing pending completion of the operation.
- */
- private CompletableFuture<RaftGroupService> updateRaftGroupInternal(String groupId, Collection<ClusterNode> nodes,
- Collection<ClusterNode> deltaNodes, Supplier<RaftGroupListener> lsnrSupplier) {
+ ) {
assert !nodes.isEmpty();
List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
@@ -289,33 +230,8 @@ public class Loza implements IgniteComponent {
* @param expectedNodes List of nodes that contains the raft group peers.
* @param changedNodes List of nodes that will contain the raft group peers after.
* @return Future which will complete when peers change.
- * @throws NodeStoppingException If node stopping intention was detected.
*/
- public CompletableFuture<Void> changePeers(
- String groupId,
- List<ClusterNode> expectedNodes,
- List<ClusterNode> changedNodes
- ) throws NodeStoppingException {
- if (!busyLock.enterBusy()) {
- throw new NodeStoppingException();
- }
-
- try {
- return changePeersInternal(groupId, expectedNodes, changedNodes);
- } finally {
- busyLock.leaveBusy();
- }
- }
-
- /**
- * Internal method for changing peers for a RAFT group.
- *
- * @param groupId Raft group id.
- * @param expectedNodes List of nodes that contains the raft group peers.
- * @param changedNodes List of nodes that will contain the raft group peers after.
- * @return Future which will complete when peers change.
- */
- private CompletableFuture<Void> changePeersInternal(String groupId, List<ClusterNode> expectedNodes, List<ClusterNode> changedNodes) {
+ public CompletableFuture<Void> changePeers(String groupId, List<ClusterNode> expectedNodes, List<ClusterNode> changedNodes) {
List<Peer> expectedPeers = expectedNodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
List<Peer> changedPeers = changedNodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
@@ -337,18 +253,9 @@ public class Loza implements IgniteComponent {
* Stops a raft group on the current node.
*
* @param groupId Raft group id.
- * @throws NodeStoppingException If node stopping intention was detected.
*/
- public void stopRaftGroup(String groupId) throws NodeStoppingException {
- if (!busyLock.enterBusy()) {
- throw new NodeStoppingException();
- }
-
- try {
- raftServer.stopRaftGroup(groupId);
- } finally {
- busyLock.leaveBusy();
- }
+ public void stopRaftGroup(String groupId) {
+ raftServer.stopRaftGroup(groupId);
}
/**
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
deleted file mode 100644
index ad1dc2f..0000000
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.raft;
-
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.function.Supplier;
-import org.apache.ignite.internal.testframework.IgniteAbstractTest;
-import org.apache.ignite.lang.NodeStoppingException;
-import org.apache.ignite.network.ClusterLocalConfiguration;
-import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.MessagingService;
-import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.network.TopologyService;
-import org.apache.ignite.raft.client.service.RaftGroupListener;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-/**
- * There are tests for RAFT manager.
- * It is mocking all components except Loza and checks API methods of the component in various conditions.
- */
-@ExtendWith(MockitoExtension.class)
-public class LozaTest extends IgniteAbstractTest {
- /** Mock for network service. */
- @Mock
- private ClusterService clusterNetSvc;
-
- /**
- * Checks that the all API methods throw the exception ({@link org.apache.ignite.lang.NodeStoppingException})
- * when Loza is closed.
- *
- * @throws Exception If fail.
- */
- @Test
- public void testLozaStop() throws Exception {
- Mockito.doReturn(new ClusterLocalConfiguration("test_node", null)).when(clusterNetSvc).localConfiguration();
- Mockito.doReturn(Mockito.mock(MessagingService.class)).when(clusterNetSvc).messagingService();
- Mockito.doReturn(Mockito.mock(TopologyService.class)).when(clusterNetSvc).topologyService();
-
- Loza loza = new Loza(clusterNetSvc, workDir);
-
- loza.start();
-
- loza.beforeNodeStop();
- loza.stop();
-
- String raftGroupId = "test_raft_group";
-
- List<ClusterNode> nodes = List.of(
- new ClusterNode(UUID.randomUUID().toString(), UUID.randomUUID().toString(), NetworkAddress.from("127.0.0.1:123")));
-
- List<ClusterNode> newNodes = List.of(
- new ClusterNode(UUID.randomUUID().toString(), UUID.randomUUID().toString(), NetworkAddress.from("127.0.0.1:124")),
- new ClusterNode(UUID.randomUUID().toString(), UUID.randomUUID().toString(), NetworkAddress.from("127.0.0.1:125")));
-
- Supplier<RaftGroupListener> lsnrSupplier = () -> null;
-
- assertThrows(NodeStoppingException.class, () -> loza.updateRaftGroup(raftGroupId, nodes, newNodes, lsnrSupplier));
- assertThrows(NodeStoppingException.class, () -> loza.stopRaftGroup(raftGroupId));
- assertThrows(NodeStoppingException.class, () -> loza.prepareRaftGroup(raftGroupId, nodes, lsnrSupplier));
- assertThrows(NodeStoppingException.class, () -> loza.changePeers(raftGroupId, nodes, newNodes));
- }
-}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 99ef3a4..bdd66aa 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
@@ -330,9 +331,21 @@ public class IgniteImpl implements Ignite {
* Stops ignite node.
*/
public void stop() {
- if (status.getAndSet(Status.STOPPING) == Status.STARTED) {
- doStopNode(List.of(vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, metaStorageMgr, clusterCfgMgr, baselineMgr,
- distributedTblMgr, qryEngine, restModule, clientHandlerModule));
+ AtomicBoolean explicitStop = new AtomicBoolean();
+
+ status.getAndUpdate(status -> {
+ if (status == Status.STARTED) {
+ explicitStop.set(true);
+ } else {
+ explicitStop.set(false);
+ }
+
+ return Status.STOPPING;
+ });
+
+ if (explicitStop.get()) {
+ doStopNode(List.of(vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, txManager, metaStorageMgr, clusterCfgMgr, baselineMgr,
+ distributedTblMgr, qryEngine, restModule, clientHandlerModule, nettyBootstrapFactory));
}
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 8f27d1a..98e51d0 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -328,24 +328,20 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
InternalTable internalTable = tablesById.get(tblId).internalTable();
// Create new raft nodes according to new assignments.
- try {
- futures[i] = raftMgr.updateRaftGroup(
- raftGroupName(tblId, partId),
- newPartitionAssignment,
- toAdd,
- () -> new PartitionListener(tblId,
- new VersionedRowStore(internalTable.storage().getOrCreatePartition(partId), txManager))
- ).thenAccept(
- updatedRaftGroupService -> ((InternalTableImpl) internalTable).updateInternalTableRaftGroupService(
- partId, updatedRaftGroupService)
- ).exceptionally(th -> {
- LOG.error("Failed to update raft groups one the node", th);
-
- return null;
- });
- } catch (NodeStoppingException e) {
- throw new AssertionError("Loza was stopped before Table manager.", e);
- }
+ futures[i] = raftMgr.updateRaftGroup(
+ raftGroupName(tblId, partId),
+ newPartitionAssignment,
+ toAdd,
+ () -> new PartitionListener(tblId,
+ new VersionedRowStore(internalTable.storage().getOrCreatePartition(partId), txManager))
+ ).thenAccept(
+ updatedRaftGroupService -> ((InternalTableImpl) internalTable).updateInternalTableRaftGroupService(
+ partId, updatedRaftGroupService)
+ ).exceptionally(th -> {
+ LOG.error("Failed to update raft groups one the node", th);
+
+ return null;
+ });
}
return CompletableFuture.allOf(futures);
@@ -497,17 +493,13 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
for (int p = 0; p < partitions; p++) {
int partId = p;
- try {
- partitionsGroupsFutures.add(
- raftMgr.prepareRaftGroup(
- raftGroupName(tblId, p),
- assignment.get(p),
- () -> new PartitionListener(tblId, new VersionedRowStore(tableStorage.getOrCreatePartition(partId), txManager))
- )
- );
- } catch (NodeStoppingException e) {
- throw new AssertionError("Loza was stopped before Table manager", e);
- }
+ partitionsGroupsFutures.add(
+ raftMgr.prepareRaftGroup(
+ raftGroupName(tblId, p),
+ assignment.get(p),
+ () -> new PartitionListener(tblId, new VersionedRowStore(tableStorage.getOrCreatePartition(partId), txManager))
+ )
+ );
}
CompletableFuture.allOf(partitionsGroupsFutures.toArray(CompletableFuture[]::new)).thenRun(() -> {
@@ -714,43 +706,43 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
boolean exceptionWhenExist
) {
CompletableFuture<Table> tblFut = new CompletableFuture<>();
-
+
IgniteUuid tblId = TABLE_ID_GENERATOR.randomUuid();
-
+
EventListener<TableEventParameters> clo = new EventListener<>() {
@Override
public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
IgniteUuid notificationTblId = parameters.tableId();
-
+
if (!tblId.equals(notificationTblId)) {
return false;
}
-
+
if (e == null) {
tblFut.complete(parameters.table());
} else {
tblFut.completeExceptionally(e);
}
-
+
return true;
}
-
+
@Override
public void remove(@NotNull Throwable e) {
tblFut.completeExceptionally(e);
}
};
-
+
listen(TableEvent.CREATE, clo);
-
+
tablesCfg.tables().change(change -> {
if (change.get(name) != null) {
throw new TableAlreadyExistsException(name);
}
-
+
change.create(name, (ch) -> {
tableInitChange.accept(ch);
-
+
((ExtendedTableChange) ch)
// Table id specification.
.changeId(tblId.toString())
@@ -764,7 +756,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
String.valueOf(INITIAL_SCHEMA_VERSION),
schemaCh -> {
SchemaDescriptor schemaDesc;
-
+
//TODO IGNITE-15747 Remove try-catch and force configuration
// validation here to ensure a valid configuration passed to
// prepareSchemaDescriptor() method.
@@ -775,7 +767,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
} catch (IllegalArgumentException ex) {
throw new ConfigurationValidationException(ex.getMessage());
}
-
+
schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(schemaDesc));
}
));
@@ -783,24 +775,24 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
);
}).exceptionally(t -> {
Throwable ex = t.getCause();
-
+
if (ex instanceof TableAlreadyExistsException) {
tableAsync(name, false).thenAccept(table -> {
if (!exceptionWhenExist) {
tblFut.complete(table);
}
-
+
removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(ex));
});
} else {
LOG.error(LoggerMessageHelper.format("Table wasn't created [name={}]", name), t);
-
+
removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(ex));
}
-
+
return null;
});
-
+
return tblFut;
}
@@ -885,23 +877,23 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
};
listen(TableEvent.ALTER, clo);
-
+
tablesCfg.tables().change(ch -> ch.createOrUpdate(name, tblCh -> {
tableChange.accept(tblCh);
-
+
((ExtendedTableChange) tblCh).changeSchemas(schemasCh ->
schemasCh.createOrUpdate(String.valueOf(schemasCh.size() + 1), schemaCh -> {
ExtendedTableView currTableView = (ExtendedTableView) tablesCfg.tables().get(name).value();
-
+
SchemaDescriptor descriptor;
-
+
//TODO IGNITE-15747 Remove try-catch and force configuration validation
// here to ensure a valid configuration passed to prepareSchemaDescriptor() method.
try {
descriptor = SchemaUtils.prepareSchemaDescriptor(
((ExtendedTableView) tblCh).schemas().size(),
tblCh);
-
+
descriptor.columnMapping(SchemaUtils.columnMapper(
tablesById.get(tblId).schemaView().schema(currTableView.schemas().size()),
currTableView,
@@ -913,20 +905,20 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
// when bulk configuration update is applied.
ConfigurationValidationException e =
new ConfigurationValidationException(ex.getMessage());
-
+
e.addSuppressed(ex);
-
+
throw e;
}
-
+
schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(descriptor));
}));
}
)).exceptionally(t -> {
LOG.error(LoggerMessageHelper.format("Table wasn't altered [name={}]", name), t);
-
+
removeListener(TableEvent.ALTER, clo, new IgniteInternalCheckedException(t.getCause()));
-
+
return null;
});
}
@@ -1001,9 +993,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
.change(change -> change.delete(name))
.exceptionally(t -> {
LOG.error(LoggerMessageHelper.format("Table wasn't dropped [name={}]", name), t);
-
+
removeListener(TableEvent.DROP, clo, new IgniteInternalCheckedException(t.getCause()));
-
+
return null;
});
}
@@ -1465,19 +1457,15 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
List<ClusterNode> oldPartitionAssignment = oldAssignments.get(p);
List<ClusterNode> newPartitionAssignment = newAssignments.get(p);
- try {
- futures[i] = raftMgr.changePeers(
- raftGroupName(tblId, p),
- oldPartitionAssignment,
- newPartitionAssignment
- ).exceptionally(th -> {
- LOG.error("Failed to update raft peers for group " + raftGroupName(tblId, p)
- + "from " + oldPartitionAssignment + " to " + newPartitionAssignment, th);
- return null;
- });
- } catch (NodeStoppingException e) {
- throw new AssertionError("Loza was stopped before Table manager", e);
- }
+ futures[i] = raftMgr.changePeers(
+ raftGroupName(tblId, p),
+ oldPartitionAssignment,
+ newPartitionAssignment
+ ).exceptionally(th -> {
+ LOG.error("Failed to update raft peers for group " + raftGroupName(tblId, p)
+ + "from " + oldPartitionAssignment + " to " + newPartitionAssignment, th);
+ return null;
+ });
}
return CompletableFuture.allOf(futures);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index f790feb..475e438 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -24,7 +24,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
@@ -191,11 +190,9 @@ public class TableManagerTest extends IgniteAbstractTest {
/**
* Tests create a table through public API.
- *
- * @throws Exception If failed.
*/
@Test
- public void testCreateTable() throws Exception {
+ public void testCreateTable() {
TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_NAME).columns(
SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
@@ -210,11 +207,9 @@ public class TableManagerTest extends IgniteAbstractTest {
/**
* Tests drop a table through public API.
- *
- * @throws Exception If failed.
*/
@Test
- public void testDropTable() throws Exception {
+ public void testDropTable() {
TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_FOR_DROP_NAME).columns(
SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
@@ -307,11 +302,9 @@ public class TableManagerTest extends IgniteAbstractTest {
/**
* Cheks that the all RAFT nodes will be stopped when Table manager is stopping.
- *
- * @throws Exception If failed.
*/
@Test
- public void tableManagerStopTest() throws Exception {
+ public void tableManagerStopTest() {
TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_FOR_DROP_NAME).columns(
SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
@@ -337,18 +330,12 @@ public class TableManagerTest extends IgniteAbstractTest {
SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
).withPrimaryKey("key").build();
-
+
Phaser phaser = new Phaser(2);
-
- CompletableFuture<Table> createFut = CompletableFuture.supplyAsync(() -> {
- try {
- return mockManagersAndCreateTableWithDelay(scmTbl, tblManagerFut, phaser);
- } catch (NodeStoppingException e) {
- fail(e.getMessage());
- }
-
- return null;
- });
+
+ CompletableFuture<Table> createFut = CompletableFuture.supplyAsync(() ->
+ mockManagersAndCreateTableWithDelay(scmTbl, tblManagerFut, phaser)
+ );
CompletableFuture<Table> getFut = CompletableFuture.supplyAsync(() -> {
phaser.awaitAdvance(0);
@@ -375,11 +362,9 @@ public class TableManagerTest extends IgniteAbstractTest {
/**
* Tries to create a table that already exists.
- *
- * @throws Exception If failed.
*/
@Test
- public void testDoubledCreateTable() throws Exception {
+ public void testDoubledCreateTable() {
TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_NAME)
.columns(
SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
@@ -408,12 +393,11 @@ public class TableManagerTest extends IgniteAbstractTest {
* @param tableDefinition Configuration schema for a table.
* @param tblManagerFut Future for table manager.
* @return Table.
- * @throws NodeStoppingException If something went wrong.
*/
private TableImpl mockManagersAndCreateTable(
TableDefinition tableDefinition,
CompletableFuture<TableManager> tblManagerFut
- ) throws NodeStoppingException {
+ ) {
return mockManagersAndCreateTableWithDelay(tableDefinition, tblManagerFut, null);
}
@@ -424,14 +408,13 @@ public class TableManagerTest extends IgniteAbstractTest {
* @param tblManagerFut Future for table manager.
* @param phaser Phaser for the wait.
* @return Table manager.
- * @throws NodeStoppingException If something went wrong.
*/
@NotNull
private TableImpl mockManagersAndCreateTableWithDelay(
TableDefinition tableDefinition,
CompletableFuture<TableManager> tblManagerFut,
Phaser phaser
- ) throws NodeStoppingException {
+ ) {
when(rm.prepareRaftGroup(any(), any(), any())).thenAnswer(mock -> {
RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);