You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/08/24 09:44:24 UTC
[ignite-3] branch main updated: IGNITE-15278 Node start/stop
refactoring. Fixes #293
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3da7311 IGNITE-15278 Node start/stop refactoring. Fixes #293
3da7311 is described below
commit 3da73114c3ac230d4034be9468b7ed26ad6f6217
Author: Alexander Lapin <la...@gmail.com>
AuthorDate: Tue Aug 24 12:44:02 2021 +0300
IGNITE-15278 Node start/stop refactoring. Fixes #293
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
modules/metastorage-client/pom.xml | 7 +
.../ITMetaStorageServicePersistenceTest.java | 12 +-
.../client/ITMetaStorageServiceTest.java | 34 +-
modules/network-api/pom.xml | 5 +
.../ignite/network/ClusterLocalConfiguration.java | 32 +-
.../ignite/network/ClusterServiceFactory.java | 11 +-
modules/network/pom.xml | 7 +
.../network/scalecube/ITNodeRestartsTest.java | 12 +-
.../scalecube/ITScaleCubeNetworkMessagingTest.java | 26 +-
.../ignite/utils/ClusterServiceTestUtils.java | 103 +++++
.../scalecube/ScaleCubeClusterServiceFactory.java | 74 ++--
modules/raft/pom.xml | 7 +
.../ignite/raft/jraft/core/ITCliServiceTest.java | 14 +-
.../apache/ignite/raft/jraft/core/ITNodeTest.java | 24 +-
.../ignite/raft/server/RaftServerAbstractTest.java | 12 +-
.../apache/ignite/raft/jraft/core/TestCluster.java | 23 +-
.../ignite/raft/jraft/rpc/IgniteRpcTest.java | 35 +-
modules/runner/pom.xml | 7 +
.../runner/app/ITThinClientConnectionTest.java | 48 +--
.../ITDistributedConfigurationStorageTest.java | 22 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 418 +++++++++++++++++++-
.../apache/ignite/internal/app/IgnitionImpl.java | 425 +--------------------
.../ignite/distributed/ITDistributedTableTest.java | 13 +-
23 files changed, 734 insertions(+), 637 deletions(-)
diff --git a/modules/metastorage-client/pom.xml b/modules/metastorage-client/pom.xml
index 10ca77b..b2008f9 100644
--- a/modules/metastorage-client/pom.xml
+++ b/modules/metastorage-client/pom.xml
@@ -89,6 +89,13 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
index 6908980..fc5632b 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
@@ -39,7 +39,6 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
@@ -51,6 +50,7 @@ import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -351,9 +351,13 @@ public class ITMetaStorageServicePersistenceTest {
private ClusterService clusterService(String name, int port, NetworkAddress otherPeer) {
var nodeFinder = new StaticNodeFinder(List.of(otherPeer));
- var context = new ClusterLocalConfiguration(name, port, nodeFinder, SERIALIZATION_REGISTRY);
-
- var network = NETWORK_FACTORY.createClusterService(context);
+ var network = ClusterServiceTestUtils.clusterService(
+ name,
+ port,
+ nodeFinder,
+ SERIALIZATION_REGISTRY,
+ NETWORK_FACTORY
+ );
network.start();
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
index fc9fad2..c67f78a 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
@@ -43,19 +43,17 @@ import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.LocalPortRangeNodeFinder;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
-import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
@@ -181,8 +179,19 @@ public class ITMetaStorageServiceTest {
var nodeFinder = new LocalPortRangeNodeFinder(NODE_PORT_BASE, NODE_PORT_BASE + NODES);
nodeFinder.findNodes().stream()
- .map(addr -> startClusterNode(addr, nodeFinder))
- .forEach(cluster::add);
+ .map(
+ addr -> ClusterServiceTestUtils.clusterService(
+ addr.toString(),
+ addr.port(),
+ nodeFinder,
+ SERIALIZATION_REGISTRY,
+ NETWORK_FACTORY
+ )
+ )
+ .forEach(clusterService -> {
+ clusterService.start();
+ cluster.add(clusterService);
+ });
for (ClusterService node : cluster)
assertTrue(waitForTopology(node, NODES, 1000));
@@ -1047,21 +1056,6 @@ public class ITMetaStorageServiceTest {
}
/**
- * @param addr Node address.
- * @param nodeFinder Node finder.
- * @return The client cluster view.
- */
- private static ClusterService startClusterNode(NetworkAddress addr, NodeFinder nodeFinder) {
- var ctx = new ClusterLocalConfiguration(addr.toString(), addr.port(), nodeFinder, SERIALIZATION_REGISTRY);
-
- var net = NETWORK_FACTORY.createClusterService(ctx);
-
- net.start();
-
- return net;
- }
-
- /**
* @param cluster The cluster.
* @param exp Expected count.
* @param timeout The timeout in millis.
diff --git a/modules/network-api/pom.xml b/modules/network-api/pom.xml
index c8514cf..42312ff 100644
--- a/modules/network-api/pom.xml
+++ b/modules/network-api/pom.xml
@@ -35,6 +35,11 @@
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
</dependency>
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
index b3f5544..52a7fdd 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
@@ -29,12 +29,6 @@ public class ClusterLocalConfiguration {
/** The network alias of a node. */
private final String name;
- /** The port. */
- private final int port;
-
- /** Node finder. */
- private final NodeFinder nodeFinder;
-
/** Message mapper providers. */
private final MessageSerializationRegistry serializationRegistry;
@@ -42,16 +36,10 @@ public class ClusterLocalConfiguration {
* Constructor.
*
* @param name Local name.
- * @param port Local port.
- * @param nodeFinder Node finder for discovering the initial cluster members.
* @param serializationRegistry Message serialization registry.
*/
- public ClusterLocalConfiguration(
- String name, int port, NodeFinder nodeFinder, MessageSerializationRegistry serializationRegistry
- ) {
+ public ClusterLocalConfiguration(String name, MessageSerializationRegistry serializationRegistry) {
this.name = name;
- this.port = port;
- this.nodeFinder = nodeFinder;
this.serializationRegistry = serializationRegistry;
}
@@ -65,24 +53,6 @@ public class ClusterLocalConfiguration {
}
/**
- * Returns the local network port of the node.
- *
- * @return Port.
- */
- public int getPort() {
- return port;
- }
-
- /**
- * Returns the Node Finder for discovering the initial cluster members.
- *
- * @return Node Finder.
- */
- public NodeFinder getNodeFinder() {
- return nodeFinder;
- }
-
- /**
* Returns the message serialization registry.
*
* @return Message serialization registry.
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
index c149208..f0f1076 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
@@ -16,6 +16,9 @@
*/
package org.apache.ignite.network;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+
/**
* Cluster service factory.
*/
@@ -24,7 +27,13 @@ public interface ClusterServiceFactory {
* Creates a new {@link ClusterService} using the provided context. The created network will not be in the "started" state.
*
* @param context Cluster context.
+ * @param nodeConfiguration Node configuration.
+ * @param nodeFinderSupplier Supplier that provides node finder for discovering the initial cluster members.
* @return New cluster service.
*/
- ClusterService createClusterService(ClusterLocalConfiguration context);
+ ClusterService createClusterService(
+ ClusterLocalConfiguration context,
+ ConfigurationManager nodeConfiguration,
+ Supplier<NodeFinder> nodeFinderSupplier
+ );
}
diff --git a/modules/network/pom.xml b/modules/network/pom.xml
index 663d747..c79a3e8 100644
--- a/modules/network/pom.xml
+++ b/modules/network/pom.xml
@@ -73,6 +73,13 @@
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
index b78cd06..9144989 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.LocalPortRangeNodeFinder;
@@ -28,6 +27,7 @@ import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -109,9 +109,13 @@ class ITNodeRestartsTest {
* @return Created Cluster Service.
*/
private ClusterService startNetwork(NetworkAddress addr, NodeFinder nodeFinder) {
- var context = new ClusterLocalConfiguration(addr.toString(), addr.port(), nodeFinder, serializationRegistry);
-
- ClusterService clusterService = networkFactory.createClusterService(context);
+ ClusterService clusterService = ClusterServiceTestUtils.clusterService(
+ addr.toString(),
+ addr.port(),
+ nodeFinder,
+ serializationRegistry,
+ networkFactory
+ );
clusterService.start();
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index 1b5bc5a..3734578 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -33,7 +33,6 @@ import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.transport.api.Transport;
import org.apache.ignite.internal.network.NetworkMessageTypes;
import org.apache.ignite.lang.NodeStoppingException;
-import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
@@ -48,6 +47,7 @@ import org.apache.ignite.network.TestMessagesFactory;
import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.network.annotations.MessageGroup;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
@@ -383,10 +383,15 @@ class ITScaleCubeNetworkMessagingTest {
* @throws Exception If failed to stop.
*/
private static void stopForcefully(ClusterService cluster) throws Exception {
- Field clusterImplField = cluster.getClass().getDeclaredField("val$cluster");
+ Field clusterSvcImplField = cluster.getClass().getDeclaredField("val$clusterSvc");
+ clusterSvcImplField.setAccessible(true);
+
+ ClusterService innerClusterSvc = (ClusterService) clusterSvcImplField.get(cluster);
+
+ Field clusterImplField = innerClusterSvc.getClass().getDeclaredField("cluster");
clusterImplField.setAccessible(true);
- ClusterImpl clusterImpl = (ClusterImpl) clusterImplField.get(cluster);
+ ClusterImpl clusterImpl = (ClusterImpl) clusterImplField.get(innerClusterSvc);
Field transportField = clusterImpl.getClass().getDeclaredField("transport");
transportField.setAccessible(true);
@@ -442,13 +447,16 @@ class ITScaleCubeNetworkMessagingTest {
* @return Started cluster node.
*/
private ClusterService startNode(NetworkAddress addr, NodeFinder nodeFinder, boolean initial) {
- var context =
- new ClusterLocalConfiguration(addr.toString(), addr.port(), nodeFinder, serializationRegistry);
-
- ClusterService clusterService = networkFactory.createClusterService(context);
+ ClusterService clusterSvc = ClusterServiceTestUtils.clusterService(
+ addr.toString(),
+ addr.port(),
+ nodeFinder,
+ serializationRegistry,
+ networkFactory
+ );
if (initial)
- clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
+ clusterSvc.topologyService().addEventHandler(new TopologyEventHandler() {
/** {@inheritDoc} */
@Override public void onAppeared(ClusterNode member) {
startupLatch.countDown();
@@ -459,7 +467,7 @@ class ITScaleCubeNetworkMessagingTest {
}
});
- return clusterService;
+ return clusterSvc;
}
/**
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java b/modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
new file mode 100644
index 0000000..2767b0a
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.utils;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NodeFinder;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+
+/**
+ * Test utils that provide sort of cluster service mock that manages required node configuration internally.
+ */
+public class ClusterServiceTestUtils {
+ /**
+ * Creates a cluster service and required node configuration manager beneath it.
+ * Populates node configuration with specified port.
+ * Manages configuration manager lifecycle: on cluster service start starts node configuration manager,
+ * on cluster service stop - stops node configuration manager.
+ *
+ * @param nodeName Local name.
+ * @param port Local port.
+ * @param nodeFinder Node finder for discovering the initial cluster members.
+ * @param msgSerializationRegistry Message serialization registry.
+ * @param clusterSvcFactory Cluster service factory.
+ */
+ public static ClusterService clusterService(
+ String nodeName,
+ int port,
+ NodeFinder nodeFinder,
+ MessageSerializationRegistry msgSerializationRegistry,
+ ClusterServiceFactory clusterSvcFactory
+ ) {
+ var ctx = new ClusterLocalConfiguration(nodeName, msgSerializationRegistry);
+
+ ConfigurationManager nodeConfigurationMgr = new ConfigurationManager(
+ Collections.singleton(NetworkConfiguration.KEY),
+ Map.of(),
+ new TestConfigurationStorage(ConfigurationType.LOCAL)
+ );
+
+ var clusterSvc = clusterSvcFactory.createClusterService(
+ ctx,
+ nodeConfigurationMgr,
+ () -> nodeFinder
+ );
+
+ return new ClusterService() {
+ @Override public TopologyService topologyService() {
+ return clusterSvc.topologyService();
+ }
+
+ @Override public MessagingService messagingService() {
+ return clusterSvc.messagingService();
+ }
+
+ @Override public ClusterLocalConfiguration localConfiguration() {
+ return clusterSvc.localConfiguration();
+ }
+
+ @Override public boolean isStopped() {
+ return clusterSvc.isStopped();
+ }
+
+ @Override public void start() {
+ nodeConfigurationMgr.start();
+
+ nodeConfigurationMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY).
+ change(netCfg -> netCfg.changePort(port)).join();
+
+ clusterSvc.start();
+ }
+
+ @Override public void stop() {
+ clusterSvc.stop();
+ nodeConfigurationMgr.stop();
+ }
+ };
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index f6d3def..bb26275 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -20,6 +20,7 @@ package org.apache.ignite.network.scalecube;
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.UUID;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
@@ -32,6 +33,8 @@ import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
+import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
@@ -41,6 +44,7 @@ import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
/**
@@ -49,7 +53,11 @@ import org.apache.ignite.network.serialization.MessageSerializationRegistry;
*/
public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
/** {@inheritDoc} */
- @Override public ClusterService createClusterService(ClusterLocalConfiguration context) {
+ @Override public ClusterService createClusterService(
+ ClusterLocalConfiguration context,
+ ConfigurationManager nodeConfiguration,
+ Supplier<NodeFinder> nodeFinderSupplier
+ ) {
String consistentId = context.getName();
var topologyService = new ScaleCubeTopologyService();
@@ -62,39 +70,43 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
UUID launchId = UUID.randomUUID();
- var connectionManager = new ConnectionManager(
- context.getPort(),
- registry,
- consistentId,
- () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
- () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory)
- );
-
- var transport = new ScaleCubeDirectMarshallerTransport(connectionManager, topologyService, messageFactory);
-
- var cluster = new ClusterImpl(clusterConfig())
- .handler(cl -> new ClusterMessageHandler() {
- /** {@inheritDoc} */
- @Override public void onMessage(Message message) {
- messagingService.fireEvent(message);
- }
-
- /** {@inheritDoc} */
- @Override public void onMembershipEvent(MembershipEvent event) {
- topologyService.onMembershipEvent(event);
- }
- })
- .config(opts -> opts.memberAlias(consistentId))
- .transport(opts -> opts.transportFactory(new DelegatingTransportFactory(messagingService, config -> transport)))
- .membership(opts -> opts.seedMembers(parseAddresses(context.getNodeFinder().findNodes())));
+ return new AbstractClusterService(context, topologyService, messagingService) {
+ private volatile ClusterImpl cluster;
- // resolve cyclic dependencies
- messagingService.setCluster(cluster);
+ private volatile ConnectionManager connectionMgr;
- return new AbstractClusterService(context, topologyService, messagingService) {
/** {@inheritDoc} */
@Override public void start() {
- connectionManager.start();
+ this.connectionMgr = new ConnectionManager(
+ nodeConfiguration.configurationRegistry().getConfiguration(NetworkConfiguration.KEY).value().port(),
+ registry,
+ consistentId,
+ () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
+ () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory)
+ );
+
+ var transport = new ScaleCubeDirectMarshallerTransport(connectionMgr, topologyService, messageFactory);
+
+ this.cluster = new ClusterImpl(clusterConfig())
+ .handler(cl -> new ClusterMessageHandler() {
+ /** {@inheritDoc} */
+ @Override public void onMessage(Message message) {
+ messagingService.fireEvent(message);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMembershipEvent(MembershipEvent event) {
+ topologyService.onMembershipEvent(event);
+ }
+ })
+ .config(opts -> opts.memberAlias(consistentId))
+ .transport(opts -> opts.transportFactory(new DelegatingTransportFactory(messagingService, config -> transport)))
+ .membership(opts -> opts.seedMembers(parseAddresses(nodeFinderSupplier.get().findNodes())));
+
+ // resolve cyclic dependencies
+ messagingService.setCluster(cluster);
+
+ connectionMgr.start();
cluster.startAwait();
@@ -111,7 +123,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
cluster.shutdown();
cluster.onShutdown().block();
- connectionManager.stop();
+ connectionMgr.stop();
}
/** {@inheritDoc} */
diff --git a/modules/raft/pom.xml b/modules/raft/pom.xml
index 8c2966c..73ad792 100644
--- a/modules/raft/pom.xml
+++ b/modules/raft/pom.xml
@@ -71,6 +71,13 @@
<!-- Test dependencies -->
<dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
index ea4d055..bf25cee 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
@@ -33,7 +33,6 @@ import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NodeFinder;
@@ -49,6 +48,7 @@ import org.apache.ignite.raft.jraft.entity.Task;
import org.apache.ignite.raft.jraft.option.CliOptions;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.test.TestUtils;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -122,11 +122,13 @@ public class ITCliServiceTest {
var registry = new MessageSerializationRegistryImpl();
- var serviceConfig = new ClusterLocalConfiguration("client", TestUtils.INIT_PORT - 1, nodeFinder, registry);
-
- var factory = new TestScaleCubeClusterServiceFactory();
-
- ClusterService clientSvc = factory.createClusterService(serviceConfig);
+ ClusterService clientSvc = ClusterServiceTestUtils.clusterService(
+ "client",
+ TestUtils.INIT_PORT - 1,
+ nodeFinder,
+ registry,
+ new TestScaleCubeClusterServiceFactory()
+ );
clientSvc.start();
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
index 95d1c55..499056a 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
@@ -42,7 +42,6 @@ import java.util.stream.Stream;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.StaticNodeFinder;
@@ -87,6 +86,7 @@ import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Bits;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.Utils;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -3430,7 +3430,13 @@ public class ITNodeTest {
var nodeManager = new NodeManager();
- ClusterService clusterService = createClusterService(peerId.getEndpoint(), nodeFinder);
+ ClusterService clusterService = ClusterServiceTestUtils.clusterService(
+ peerId.getEndpoint().toString(),
+ peerId.getEndpoint().getPort(),
+ nodeFinder,
+ new TestMessageSerializationRegistryImpl(),
+ new TestScaleCubeClusterServiceFactory()
+ );
IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions);
@@ -3460,13 +3466,13 @@ public class ITNodeTest {
* Creates a non-started {@link ClusterService}.
*/
private static ClusterService createClusterService(Endpoint endpoint, NodeFinder nodeFinder) {
- var registry = new TestMessageSerializationRegistryImpl();
-
- var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), nodeFinder, registry);
-
- var clusterServiceFactory = new TestScaleCubeClusterServiceFactory();
-
- return clusterServiceFactory.createClusterService(clusterConfig);
+ return ClusterServiceTestUtils.clusterService(
+ endpoint.toString(),
+ endpoint.getPort(),
+ nodeFinder,
+ new TestMessageSerializationRegistryImpl(),
+ new TestScaleCubeClusterServiceFactory()
+ );
}
private void sendTestTaskAndWait(final Node node) throws InterruptedException {
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
index 89b4c3a..a4c6138 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.raft.server;
import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
@@ -29,6 +28,7 @@ import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestInfo;
@@ -67,9 +67,13 @@ abstract class RaftServerAbstractTest {
* @return The client cluster view.
*/
protected ClusterService clusterService(String name, int port, List<NetworkAddress> servers, boolean start) {
- var context = new ClusterLocalConfiguration(name, port, new StaticNodeFinder(servers), SERIALIZATION_REGISTRY);
-
- var network = NETWORK_FACTORY.createClusterService(context);
+ var network = ClusterServiceTestUtils.clusterService(
+ name,
+ port,
+ new StaticNodeFinder(servers),
+ SERIALIZATION_REGISTRY,
+ NETWORK_FACTORY
+ );
if (start)
network.start();
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 369b6b8..d0a983f 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -35,7 +35,6 @@ import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.StaticNodeFinder;
@@ -57,6 +56,7 @@ import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.jetbrains.annotations.Nullable;
import static java.util.stream.Collectors.collectingAndThen;
@@ -266,7 +266,13 @@ public class TestCluster {
NodeManager nodeManager = new NodeManager();
- ClusterService clusterService = createClusterService(listenAddr, nodeFinder);
+ ClusterService clusterService = ClusterServiceTestUtils.clusterService(
+ listenAddr.toString(),
+ listenAddr.getPort(),
+ nodeFinder,
+ new TestMessageSerializationRegistryImpl(),
+ new TestScaleCubeClusterServiceFactory()
+ );
var rpcClient = new IgniteRpcClient(clusterService);
@@ -303,19 +309,6 @@ public class TestCluster {
}
}
- /**
- * Creates a non-started {@link ClusterService}.
- */
- private static ClusterService createClusterService(Endpoint endpoint, NodeFinder nodeFinder) {
- var registry = new TestMessageSerializationRegistryImpl();
-
- var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), nodeFinder, registry);
-
- var clusterServiceFactory = new TestScaleCubeClusterServiceFactory();
-
- return clusterServiceFactory.createClusterService(clusterConfig);
- }
-
public Node getNode(Endpoint endpoint) {
this.lock.lock();
try {
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
index d41a90d..69cfafc 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
@@ -17,18 +17,18 @@
package org.apache.ignite.raft.jraft.rpc;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
-import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
import static org.apache.ignite.raft.jraft.JRaftUtils.addressFromEndpoint;
@@ -41,7 +41,13 @@ public class IgniteRpcTest extends AbstractRpcTest {
/** {@inheritDoc} */
@Override public RpcServer<?> createServer(Endpoint endpoint) {
- ClusterService service = createService(endpoint.toString(), endpoint.getPort());
+ ClusterService service = ClusterServiceTestUtils.clusterService(
+ endpoint.toString(),
+ endpoint.getPort(),
+ new StaticNodeFinder(Collections.emptyList()),
+ new MessageSerializationRegistryImpl(),
+ new TestScaleCubeClusterServiceFactory()
+ );
var server = new TestIgniteRpcServer(service, new NodeManager(), new NodeOptions()) {
@Override public void shutdown() {
@@ -60,7 +66,13 @@ public class IgniteRpcTest extends AbstractRpcTest {
@Override public RpcClient createClient0() {
int i = cntr.incrementAndGet();
- ClusterService service = createService("client" + i, endpoint.getPort() - i, addressFromEndpoint(endpoint));
+ ClusterService service = ClusterServiceTestUtils.clusterService(
+ "client" + i,
+ endpoint.getPort() - i,
+ new StaticNodeFinder(List.of(addressFromEndpoint(endpoint))),
+ new MessageSerializationRegistryImpl(),
+ new TestScaleCubeClusterServiceFactory()
+ );
IgniteRpcClient client = new IgniteRpcClient(service) {
@Override public void shutdown() {
@@ -77,21 +89,6 @@ public class IgniteRpcTest extends AbstractRpcTest {
return client;
}
- /**
- * @param name Node name.
- * @param port Local port.
- * @param servers Server nodes of the cluster.
- * @return The client cluster view.
- */
- private static ClusterService createService(String name, int port, NetworkAddress... servers) {
- var registry = new MessageSerializationRegistryImpl();
- var nodeFinder = new StaticNodeFinder(List.of(servers));
- var context = new ClusterLocalConfiguration(name, port, nodeFinder, registry);
- var factory = new TestScaleCubeClusterServiceFactory();
-
- return factory.createClusterService(context);
- }
-
/** {@inheritDoc} */
@Override protected boolean waitForTopology(RpcClient client, int expected, long timeout) {
IgniteRpcClient client0 = (IgniteRpcClient) client;
diff --git a/modules/runner/pom.xml b/modules/runner/pom.xml
index 8085d21..21d5196 100644
--- a/modules/runner/pom.xml
+++ b/modules/runner/pom.xml
@@ -121,6 +121,13 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ITThinClientConnectionTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ITThinClientConnectionTest.java
index e2ea59c..4485efd 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ITThinClientConnectionTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ITThinClientConnectionTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.runner.app;
-import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.LinkedHashMap;
@@ -27,9 +26,7 @@ import com.google.common.collect.Lists;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.IgnitionManager;
import org.apache.ignite.client.IgniteClient;
-import org.apache.ignite.client.handler.ClientHandlerModule;
-import org.apache.ignite.internal.app.IgnitionImpl;
-import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -43,7 +40,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -112,7 +108,9 @@ public class ITThinClientConnectionTest extends IgniteAbstractTest {
.changePartitions(10)
);
- var addrs = new String[]{"127.0.0.1:" + getClientPort("node0"), "127.0.0.1:" + getClientPort("node1")};
+ var addrs = new String[]{"127.0.0.1:" +
+ ((InetSocketAddress) ((IgniteImpl)startedNodes.stream().filter(node -> "node0".equals(node.name())).
+ findAny().get()).clientHandlerModule().localAddress()).getPort()};
for (var addr : addrs) {
try (var client = IgniteClient.builder().addresses(addr).build()) {
@@ -132,42 +130,4 @@ public class ITThinClientConnectionTest extends IgniteAbstractTest {
}
}
}
-
- /**
- * Gets the client listener port.
- *
- * @param nodeName Node name.
- * @return Port number.
- * @throws Exception When failed.
- */
- private static int getClientPort(String nodeName) throws Exception {
- InetSocketAddress addr = (InetSocketAddress) getClientHandlerModule(nodeName).localAddress();
- assertNotNull(addr);
-
- return addr.getPort();
- }
-
- /**
- * Gets the client handler module for the give node.
- *
- * @param nodeName Node name.
- * @return Client handler module.
- * @throws Exception When failed.
- */
- private static ClientHandlerModule getClientHandlerModule(String nodeName) throws Exception {
- Field field = IgnitionImpl.class.getDeclaredField("nodesStartedComponents");
- field.setAccessible(true);
-
- var componentMap = (Map<String, List<IgniteComponent>>) field.get(null);
- assertNotNull(componentMap);
-
- var components = componentMap.get(nodeName);
- assertNotNull(components);
-
- return components.stream()
- .filter(ClientHandlerModule.class::isInstance)
- .map(ClientHandlerModule.class::cast)
- .findFirst()
- .orElseThrow();
- }
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/storage/ITDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/storage/ITDistributedConfigurationStorageTest.java
index b7c5646..cbc070c 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/storage/ITDistributedConfigurationStorageTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/storage/ITDistributedConfigurationStorageTest.java
@@ -34,12 +34,12 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
-import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -86,7 +86,13 @@ public class ITDistributedConfigurationStorageTest {
Node(Path workDir) {
vaultManager = new VaultManager(new PersistentVaultService(workDir.resolve("vault")));
- clusterService = createClusterService(addr);
+ clusterService = ClusterServiceTestUtils.clusterService(
+ addr.toString(),
+ addr.port(),
+ new StaticNodeFinder(List.of(addr)),
+ new MessageSerializationRegistryImpl(),
+ new TestScaleCubeClusterServiceFactory()
+ );
raftManager = new Loza(clusterService, workDir);
@@ -109,18 +115,6 @@ public class ITDistributedConfigurationStorageTest {
}
/**
- * Creates a test {@link ClusterService}.
- */
- private static ClusterService createClusterService(NetworkAddress addr) {
- var registry = new MessageSerializationRegistryImpl();
- var nodeFinder = new StaticNodeFinder(List.of(addr));
- var context = new ClusterLocalConfiguration(addr.toString(), addr.port(), nodeFinder, registry);
- var factory = new TestScaleCubeClusterServiceFactory();
-
- return factory.createClusterService(context);
- }
-
- /**
* Starts the created components.
*/
void start() throws Exception {
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 26e31da..9c2755f 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -17,51 +17,312 @@
package org.apache.ignite.internal.app;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.IgnitionManager;
+import org.apache.ignite.client.handler.ClientHandlerModule;
+import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
+import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
+import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
+import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.affinity.AffinityManager;
+import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.processors.query.calcite.SqlQueryProcessor;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.storage.DistributedConfigurationStorage;
+import org.apache.ignite.internal.storage.LocalConfigurationStorage;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
+import org.apache.ignite.rest.RestModule;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Ignite internal implementation.
*/
public class IgniteImpl implements Ignite {
- /** Distributed table manager. */
- private final IgniteTables distributedTblMgr;
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(IgniteImpl.class);
+
+ /**
+ * Path to the persistent storage used by the {@link org.apache.ignite.internal.vault.VaultService} component.
+ */
+ private static final Path VAULT_DB_PATH = Paths.get("vault");
+
+ /**
+ * Path for the partitions persistent storage.
+ */
+ private static final Path PARTITIONS_STORE_PATH = Paths.get("db");
/** Ignite node name. */
private final String name;
- private final SqlQueryProcessor qryEngine;
+ /** Vault manager. */
+ private final VaultManager vaultMgr;
/** Configuration manager that handles node (local) configuration. */
- private final ConfigurationManager nodeConfigurationMgr;
+ private final ConfigurationManager nodeCfgMgr;
+
+ /** Cluster service (cluster network manager). */
+ private final ClusterService clusterSvc;
+
+ /** Raft manager. */
+ private final Loza raftMgr;
+
+ /** Meta storage manager. */
+ private final MetaStorageManager metaStorageMgr;
/** Configuration manager that handles cluster (distributed) configuration. */
- private final ConfigurationManager clusterConfigurationMgr;
+ private final ConfigurationManager clusterCfgMgr;
+
+ /** Baseline manager. */
+ private final BaselineManager baselineMgr;
+
+ /** Affinity manager. */
+ private final AffinityManager affinityMgr;
+
+ /** Schema manager. */
+ private final SchemaManager schemaMgr;
+
+ /** Distributed table manager. */
+ private final TableManager distributedTblMgr;
+
+ /** Query engine. */
+ private final SqlQueryProcessor qryEngine;
+
+ /** Rest module. */
+ private final RestModule restModule;
+
+ /** Client handler module. */
+ private final ClientHandlerModule clientHandlerModule;
+
+ /** Node status. Adds ability to stop currently starting node. */
+ private final AtomicReference<Status> status = new AtomicReference<>(Status.STARTING);
/**
+ * The Constructor.
+ *
* @param name Ignite node name.
- * @param tblMgr Table manager.
- * @param qryEngine Query processor.
- * @param nodeConfigurationMgr Configuration manager that handles node (local) configuration.
- * @param clusterConfigurationMgr Configuration manager that handles cluster (distributed) configuration.
+ * @param workDir Work directory for the started node. Must not be {@code null}.
*/
IgniteImpl(
String name,
- IgniteTables tblMgr,
- SqlQueryProcessor qryEngine,
- ConfigurationManager nodeConfigurationMgr,
- ConfigurationManager clusterConfigurationMgr
+ Path workDir
) {
this.name = name;
- this.distributedTblMgr = tblMgr;
- this.qryEngine = qryEngine;
- this.nodeConfigurationMgr = nodeConfigurationMgr;
- this.clusterConfigurationMgr = clusterConfigurationMgr;
+
+ vaultMgr = createVault(workDir);
+
+ nodeCfgMgr = new ConfigurationManager(
+ Arrays.asList(
+ NetworkConfiguration.KEY,
+ NodeConfiguration.KEY,
+ RestConfiguration.KEY,
+ ClientConnectorConfiguration.KEY
+ ),
+ Map.of(),
+ new LocalConfigurationStorage(vaultMgr)
+ );
+
+ clusterSvc = new ScaleCubeClusterServiceFactory().createClusterService(
+ new ClusterLocalConfiguration(
+ name,
+ new MessageSerializationRegistryImpl()
+ ),
+ nodeCfgMgr,
+ () -> StaticNodeFinder.fromConfiguration(nodeCfgMgr.configurationRegistry().
+ getConfiguration(NetworkConfiguration.KEY).value())
+ );
+
+ raftMgr = new Loza(clusterSvc, workDir);
+
+ metaStorageMgr = new MetaStorageManager(
+ vaultMgr,
+ nodeCfgMgr,
+ clusterSvc,
+ raftMgr
+ );
+
+ clusterCfgMgr = new ConfigurationManager(
+ Arrays.asList(
+ ClusterConfiguration.KEY,
+ TablesConfiguration.KEY
+ ),
+ Map.of(),
+ new DistributedConfigurationStorage(metaStorageMgr, vaultMgr)
+ );
+
+ baselineMgr = new BaselineManager(
+ clusterCfgMgr,
+ metaStorageMgr,
+ clusterSvc
+ );
+
+ affinityMgr = new AffinityManager(
+ clusterCfgMgr,
+ metaStorageMgr,
+ baselineMgr
+ );
+
+ schemaMgr = new SchemaManager(
+ clusterCfgMgr,
+ metaStorageMgr,
+ vaultMgr
+ );
+
+ distributedTblMgr = new TableManager(
+ nodeCfgMgr,
+ clusterCfgMgr,
+ metaStorageMgr,
+ schemaMgr,
+ affinityMgr,
+ raftMgr,
+ getPartitionsStorePath(workDir)
+ );
+
+ qryEngine = new SqlQueryProcessor(
+ clusterSvc,
+ distributedTblMgr
+ );
+
+ restModule = new RestModule(nodeCfgMgr, clusterCfgMgr);
+
+ clientHandlerModule = new ClientHandlerModule(distributedTblMgr, nodeCfgMgr.configurationRegistry());
+ }
+
+ /**
+ * Starts ignite node.
+ *
+ * @param cfg Optional node configuration based on {@link org.apache.ignite.configuration.schemas.runner.NodeConfigurationSchema}
+ * and {@link org.apache.ignite.configuration.schemas.network.NetworkConfigurationSchema}. Following rules are used
+ * for applying the configuration properties:
+ * <ol>
+ * <li>Specified property overrides existing one or just applies itself if it wasn't
+ * previously specified.</li>
+ * <li>All non-specified properties either use previous value or use default one from
+ * corresponding configuration schema.</li>
+ * </ol>
+ * So that, in case of initial node start (first start ever) specified configuration, supplemented with defaults, is
+ * used. If no configuration was provided defaults are used for all configuration properties. In case of node
+ * restart, specified properties override existing ones, non specified properties that also weren't specified
+ * previously use default values. Please pay attention that previously specified properties are searched in the
+ * {@code workDir} specified by the user.
+ */
+ public void start(@Nullable String cfg) {
+ List<IgniteComponent> startedComponents = new ArrayList<>();
+
+ try {
+ // Vault startup.
+ doStartComponent(
+ name,
+ startedComponents,
+ vaultMgr
+ );
+
+ vaultMgr.putName(name).join();
+
+ // Node configuration manager startup.
+ doStartComponent(
+ name,
+ startedComponents,
+ nodeCfgMgr);
+
+ // Node configuration manager bootstrap.
+ if (cfg != null) {
+ try {
+ nodeCfgMgr.bootstrap(cfg);
+ }
+ catch (Exception e) {
+ LOG.warn("Unable to parse user-specific configuration, default configuration will be used: {}",
+ e.getMessage());
+ }
+ }
+ else
+ nodeCfgMgr.configurationRegistry().initializeDefaults();
+
+ // Start the remaining components.
+ List<IgniteComponent> otherComponents = List.of(
+ clusterSvc,
+ raftMgr,
+ metaStorageMgr,
+ clusterCfgMgr,
+ baselineMgr,
+ affinityMgr,
+ schemaMgr,
+ distributedTblMgr,
+ qryEngine,
+ restModule,
+ clientHandlerModule
+ );
+
+ for (IgniteComponent component : otherComponents)
+ doStartComponent(name, startedComponents, component);
+
+ // Deploy all registered watches because all components are ready and have registered their listeners.
+ metaStorageMgr.deployWatches();
+
+ if (!status.compareAndSet(Status.STARTING, Status.STARTED))
+ throw new NodeStoppingException();
+ }
+ catch (Exception e) {
+ String errMsg = "Unable to start node=[" + name + "].";
+
+ LOG.error(errMsg, e);
+
+ doStopNode(startedComponents);
+
+ throw new IgniteException(errMsg, e);
+ }
+ }
+
+ /**
+ * Stops ignite node.
+ */
+ public void stop() {
+ AtomicBoolean explicitStop = new AtomicBoolean();
+
+ status.getAndUpdate(status -> {
+ if (status == Status.STARTED)
+ explicitStop.set(true);
+ else
+ explicitStop.set(false);
+
+ return Status.STOPPING;
+ });
+
+ if (explicitStop.get()) {
+ doStopNode(List.of(vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, metaStorageMgr,
+ clusterCfgMgr, baselineMgr, affinityMgr, schemaMgr, distributedTblMgr, qryEngine, restModule));
+ }
}
/** {@inheritDoc} */
@@ -92,13 +353,132 @@ public class IgniteImpl implements Ignite {
* @return Node configuration.
*/
public ConfigurationRegistry nodeConfiguration() {
- return nodeConfigurationMgr.configurationRegistry();
+ return nodeCfgMgr.configurationRegistry();
}
/**
* @return Cluster configuration.
*/
public ConfigurationRegistry clusterConfiguration() {
- return clusterConfigurationMgr.configurationRegistry();
+ return clusterCfgMgr.configurationRegistry();
+ }
+
+ /**
+ * @return Client handler module.
+ */
+ public ClientHandlerModule clientHandlerModule() {
+ return clientHandlerModule;
+ }
+
+ /**
+ * Checks node status. If it's {@link Status#STOPPING} then prevents further starting and throws NodeStoppingException that will
+ * lead to stopping already started components later on, otherwise starts component and add it to started components
+ * list.
+ *
+ * @param nodeName Node name.
+ * @param startedComponents List of already started components for given node.
+ * @param component Ignite component to start.
+ * @param <T> Ignite component type.
+ * @throws NodeStoppingException If node stopping intention was detected.
+ */
+ private <T extends IgniteComponent> void doStartComponent(
+ @NotNull String nodeName,
+ @NotNull List<IgniteComponent> startedComponents,
+ @NotNull T component
+ ) throws NodeStoppingException {
+ if (status.get() == Status.STOPPING)
+ throw new NodeStoppingException("Node=[" + nodeName + "] was stopped.");
+ else {
+ startedComponents.add(component);
+
+ component.start();
+ }
+ }
+
+ /**
+ * Calls {@link IgniteComponent#beforeNodeStop()} and then {@link IgniteComponent#stop()} for all components in
+ * start-reverse-order. Cleanups node started components map and node status map.
+ *
+ * @param startedComponents List of already started components for given node.
+ */
+ private void doStopNode(@NotNull List<IgniteComponent> startedComponents) {
+ ListIterator<IgniteComponent> beforeStopIter =
+ startedComponents.listIterator(startedComponents.size() - 1);
+
+ while (beforeStopIter.hasPrevious()) {
+ IgniteComponent componentToExecBeforeNodeStop = beforeStopIter.previous();
+
+ try {
+ componentToExecBeforeNodeStop.beforeNodeStop();
+ }
+ catch (Exception e) {
+ LOG.error("Unable to execute before node stop on the component=[" +
+ componentToExecBeforeNodeStop + "] within node=[" + name + ']', e);
+ }
+ }
+
+ ListIterator<IgniteComponent> stopIter =
+ startedComponents.listIterator(startedComponents.size() - 1);
+
+ while (stopIter.hasPrevious()) {
+ IgniteComponent componentToStop = stopIter.previous();
+
+ try {
+ componentToStop.stop();
+ }
+ catch (Exception e) {
+ LOG.error("Unable to stop component=[" + componentToStop + "] within node=[" + name + ']', e);
+ }
+ }
+ }
+
+ /**
+ * Starts the Vault component.
+ */
+ private static VaultManager createVault(Path workDir) {
+ Path vaultPath = workDir.resolve(VAULT_DB_PATH);
+
+ try {
+ Files.createDirectories(vaultPath);
+ }
+ catch (IOException e) {
+ throw new IgniteInternalException(e);
+ }
+
+ return new VaultManager(new PersistentVaultService(vaultPath));
+ }
+
+ /**
+ * Returns a path to the partitions store directory. Creates a directory if it doesn't exist.
+ *
+ * @param workDir Ignite work directory.
+ * @return Partitions store path.
+ */
+ @NotNull
+ private static Path getPartitionsStorePath(Path workDir) {
+ Path partitionsStore = workDir.resolve(PARTITIONS_STORE_PATH);
+
+ try {
+ Files.createDirectories(partitionsStore);
+ }
+ catch (IOException e) {
+ throw new IgniteInternalException("Failed to create directory for partitions storage: " + e.getMessage(), e);
+ }
+
+ return partitionsStore;
+ }
+
+ /**
+ * Node state.
+ */
+ private enum Status {
+ /** */
+ STARTING,
+
+ /** */
+ STARTED,
+
+ /** */
+ STOPPING
}
}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index d43c6d9..bb15bba 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -22,52 +22,13 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.Ignition;
-import org.apache.ignite.client.handler.ClientHandlerModule;
-import org.apache.ignite.configuration.RootKey;
-import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
-import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
-import org.apache.ignite.configuration.schemas.network.NetworkView;
-import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
-import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
-import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
-import org.apache.ignite.configuration.schemas.table.TableValidator;
-import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
-import org.apache.ignite.internal.affinity.AffinityManager;
-import org.apache.ignite.internal.baseline.BaselineManager;
-import org.apache.ignite.internal.configuration.ConfigurationManager;
-import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.processors.query.calcite.SqlQueryProcessor;
-import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.schema.SchemaManager;
-import org.apache.ignite.internal.schema.configuration.SchemaTableValidatorImpl;
-import org.apache.ignite.internal.storage.DistributedConfigurationStorage;
-import org.apache.ignite.internal.storage.LocalConfigurationStorage;
-import org.apache.ignite.internal.table.distributed.TableManager;
-import org.apache.ignite.internal.vault.VaultManager;
-import org.apache.ignite.internal.vault.VaultService;
-import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.LoggerMessageHelper;
-import org.apache.ignite.lang.NodeStoppingException;
-import org.apache.ignite.network.ClusterLocalConfiguration;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.MessageSerializationRegistryImpl;
-import org.apache.ignite.network.StaticNodeFinder;
-import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
-import org.apache.ignite.rest.RestModule;
import org.apache.ignite.utils.IgniteProperties;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -79,16 +40,6 @@ public class IgnitionImpl implements Ignition {
/** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(IgnitionImpl.class);
- /**
- * Path to the persistent storage used by the {@link VaultService} component.
- */
- private static final Path VAULT_DB_PATH = Paths.get("vault");
-
- /**
- * Path for the partitions persistent storage.
- */
- private static final Path PARTITIONS_STORE_PATH = Paths.get("db");
-
/** */
private static final String[] BANNER = {
"",
@@ -109,16 +60,11 @@ public class IgnitionImpl implements Ignition {
/** */
private static final String VER_KEY = "version";
- /** Mapping of a node name to a node status, e.g. node0 -> starting or node1 -> stopping. */
- private static Map<String, NodeState> nodesStatus = new ConcurrentHashMap<>();
-
/**
- * Mapping of a node name to a started node components list.
- * Given map helps to stop node by stopping all it's components in an appropriate order both
- * when node is already started which means that all components are ready and
- * if node is in a middle of a startup process which means that only part of its components are prepared.
+ * Node name to node instance mapping.
+ * Please pay attention, that nodes in given map might be in any state: STARTING, STARTED, STOPPED.
*/
- private static final Map<String, List<IgniteComponent>> nodesStartedComponents = new ConcurrentHashMap<>();
+ private static Map<String, IgniteImpl> nodes = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override public Ignite start(@NotNull String nodeName, @Nullable Path cfgPath, @NotNull Path workDir) {
@@ -130,22 +76,24 @@ public class IgnitionImpl implements Ignition {
);
}
catch (IOException e) {
- LOG.warn("Unable to read user specific configuration, default configuration will be used: " + e.getMessage());
+ LOG.warn("Unable to read user specific configuration, default configuration will be used: "
+ + e.getMessage());
return start(nodeName, workDir);
}
}
/** {@inheritDoc} */
- @Override public Ignite start(@NotNull String name, @Nullable InputStream config, @NotNull Path workDir) {
+ @Override public Ignite start(@NotNull String name, @Nullable InputStream cfg, @NotNull Path workDir) {
try {
return doStart(
name,
- config == null ? null : new String(config.readAllBytes(), StandardCharsets.UTF_8),
+ cfg == null ? null : new String(cfg.readAllBytes(), StandardCharsets.UTF_8),
workDir
);
}
catch (IOException e) {
- LOG.warn("Unable to read user specific configuration, default configuration will be used: " + e.getMessage());
+ LOG.warn("Unable to read user specific configuration, default configuration will be used: "
+ + e.getMessage());
return start(name, workDir);
}
}
@@ -157,20 +105,11 @@ public class IgnitionImpl implements Ignition {
/** {@inheritDoc} */
@Override public void stop(@NotNull String name) {
- AtomicBoolean explicitStop = new AtomicBoolean();
-
- nodesStatus.computeIfPresent(name, (nodeName, state) -> {
- if (state == NodeState.STARTED)
- explicitStop.set(true);
+ nodes.computeIfPresent(name, (nodeName, node) -> {
+ node.stop();
- return NodeState.STOPPING;
+ return null;
});
-
- if (explicitStop.get()) {
- List<IgniteComponent> startedComponents = nodesStartedComponents.get(name);
-
- doStopNode(name, startedComponents);
- }
}
/**
@@ -185,10 +124,12 @@ public class IgnitionImpl implements Ignition {
if (nodeName.isEmpty())
throw new IllegalArgumentException("Node name must not be null or empty.");
- NodeState prevNodeState = nodesStatus.putIfAbsent(nodeName, NodeState.STARTING);
+ IgniteImpl nodeToStart = new IgniteImpl(nodeName, workDir);
+
+ IgniteImpl prevNode = nodes.putIfAbsent(nodeName, nodeToStart);
- if (prevNodeState != null) {
- String errMsg = "Node with name=[" + nodeName + "] already exists in state=[" + prevNodeState + "].";
+ if (prevNode != null) {
+ String errMsg = "Node with name=[" + nodeName + "] already exists.";
LOG.error(errMsg);
@@ -197,321 +138,11 @@ public class IgnitionImpl implements Ignition {
ackBanner();
- List<IgniteComponent> startedComponents = new ArrayList<>();
-
- try {
- // Vault startup.
- VaultManager vaultMgr = doStartComponent(
- nodeName,
- startedComponents,
- createVault(workDir)
- );
-
- vaultMgr.putName(nodeName).join();
-
- List<RootKey<?, ?>> nodeRootKeys = List.of(
- NetworkConfiguration.KEY,
- NodeConfiguration.KEY,
- RestConfiguration.KEY,
- ClientConnectorConfiguration.KEY
- );
-
- // Bootstrap node configuration manager.
- ConfigurationManager nodeCfgMgr = doStartComponent(
- nodeName,
- startedComponents,
- new ConfigurationManager(
- nodeRootKeys,
- Map.of(),
- new LocalConfigurationStorage(vaultMgr)
- )
- );
-
- if (cfgContent != null) {
- try {
- nodeCfgMgr.bootstrap(cfgContent);
- }
- catch (Exception e) {
- LOG.warn("Unable to parse user-specific configuration, default configuration will be used: {}", e.getMessage());
- }
- }
- else
- nodeCfgMgr.configurationRegistry().initializeDefaults();
-
- NetworkView netConfigurationView =
- nodeCfgMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY).value();
-
- var serializationRegistry = new MessageSerializationRegistryImpl();
-
- var nodeFinder = StaticNodeFinder.fromConfiguration(netConfigurationView);
-
- // Network startup.
- ClusterService clusterNetSvc = doStartComponent(
- nodeName,
- startedComponents,
- new ScaleCubeClusterServiceFactory().createClusterService(
- new ClusterLocalConfiguration(
- nodeName,
- netConfigurationView.port(),
- nodeFinder,
- serializationRegistry
- )
- )
- );
-
- // Raft Component startup.
- Loza raftMgr = doStartComponent(
- nodeName,
- startedComponents,
- new Loza(clusterNetSvc, workDir)
- );
-
- // Meta storage Component startup.
- MetaStorageManager metaStorageMgr = doStartComponent(
- nodeName,
- startedComponents,
- new MetaStorageManager(
- vaultMgr,
- nodeCfgMgr,
- clusterNetSvc,
- raftMgr
- )
- );
-
- // TODO IGNITE-14578 Bootstrap configuration manager with distributed configuration.
-
- List<RootKey<?, ?>> clusterRootKeys = List.of(
- ClusterConfiguration.KEY,
- TablesConfiguration.KEY
- );
-
- // Start cluster configuration manager.
- ConfigurationManager clusterCfgMgr = doStartComponent(
- nodeName,
- startedComponents,
- new ConfigurationManager(
- clusterRootKeys,
- Map.of(TableValidator.class, Set.of(SchemaTableValidatorImpl.INSTANCE)),
- new DistributedConfigurationStorage(metaStorageMgr, vaultMgr)
- )
- );
-
- // Baseline manager startup.
- BaselineManager baselineMgr = doStartComponent(
- nodeName,
- startedComponents,
- new BaselineManager(
- clusterCfgMgr,
- metaStorageMgr,
- clusterNetSvc
- )
- );
-
- // Affinity manager startup.
- AffinityManager affinityMgr = doStartComponent(
- nodeName,
- startedComponents,
- new AffinityManager(
- clusterCfgMgr,
- metaStorageMgr,
- baselineMgr
- )
- );
-
- // Schema manager startup.
- SchemaManager schemaMgr = doStartComponent(
- nodeName,
- startedComponents,
- new SchemaManager(
- clusterCfgMgr,
- metaStorageMgr,
- vaultMgr
- )
- );
-
- // Distributed table manager startup.
- TableManager distributedTblMgr = doStartComponent(
- nodeName,
- startedComponents,
- new TableManager(
- nodeCfgMgr,
- clusterCfgMgr,
- metaStorageMgr,
- schemaMgr,
- affinityMgr,
- raftMgr,
- getPartitionsStorePath(workDir)
- )
- );
-
- SqlQueryProcessor qryProc = doStartComponent(
- nodeName,
- startedComponents,
- new SqlQueryProcessor(
- clusterNetSvc,
- distributedTblMgr
- )
- );
-
- doStartComponent(
- nodeName,
- startedComponents,
- new RestModule(nodeCfgMgr, clusterCfgMgr)
- );
-
- doStartComponent(
- nodeName,
- startedComponents,
- new ClientHandlerModule(distributedTblMgr, nodeCfgMgr.configurationRegistry()));
-
- // Deploy all resisted watches cause all components are ready and have registered their listeners.
- metaStorageMgr.deployWatches();
-
- AtomicBoolean explicitStop = new AtomicBoolean();
-
- nodesStatus.computeIfPresent(nodeName, (name, state) -> {
- switch (state) {
- case STARTING:
- nodesStartedComponents.put(name, startedComponents);
-
- return NodeState.STARTED;
- case STOPPING:
- explicitStop.set(true);
- }
-
- return state;
- });
-
- if (explicitStop.get())
- throw new NodeStoppingException();
-
- ackSuccessStart();
-
- return new IgniteImpl(
- nodeName,
- distributedTblMgr,
- qryProc,
- nodeCfgMgr,
- clusterCfgMgr
- );
- }
- catch (Exception e) {
- String errMsg = "Unable to start node=[" + nodeName + "].";
-
- LOG.error(errMsg, e);
+ nodeToStart.start(cfgContent);
- doStopNode(nodeName, startedComponents);
+ ackSuccessStart();
- throw new IgniteException(errMsg, e);
- }
- }
-
- /**
- * Returns a path to the partitions store directory.
- * Creates a directory if it doesn't exist.
- *
- * @param workDir Ignite work directory.
- * @return Partitions store path.
- */
- @NotNull
- private static Path getPartitionsStorePath(Path workDir) {
- Path partitionsStore = workDir.resolve(PARTITIONS_STORE_PATH);
-
- try {
- Files.createDirectories(partitionsStore);
- } catch (IOException e) {
- throw new IgniteInternalException("Failed to create directory for partitions storage: " + e.getMessage(), e);
- }
-
- return partitionsStore;
- }
-
- /**
- * Starts the Vault component.
- */
- private static VaultManager createVault(Path workDir) {
- Path vaultPath = workDir.resolve(VAULT_DB_PATH);
-
- try {
- Files.createDirectories(vaultPath);
- }
- catch (IOException e) {
- throw new IgniteInternalException(e);
- }
-
- var vaultMgr = new VaultManager(new PersistentVaultService(vaultPath));
-
- return vaultMgr;
- }
-
- /**
- * Checks node status. If it's STOPPING then prevents further starting and throws NodeStoppingException
- * that will lead to stopping already started components later on,
- * otherwise starts component and add it to started components list.
- *
- * @param nodeName Node name.
- * @param startedComponents List of already started components for given node.
- * @param component Ignite component to start.
- * @param <T> Ignite component type.
- * @return Started ignite component.
- * @throws NodeStoppingException If node stopping intention was detected.
- */
- private static <T extends IgniteComponent> T doStartComponent(
- @NotNull String nodeName,
- @NotNull List<IgniteComponent> startedComponents,
- @NotNull T component
- ) throws NodeStoppingException {
- if (nodesStatus.get(nodeName) == NodeState.STOPPING)
- throw new NodeStoppingException("Node=[" + nodeName + "] was stopped.");
- else {
- startedComponents.add(component);
-
- component.start();
-
- return component;
- }
- }
-
- /**
- * Calls {@link IgniteComponent#beforeNodeStop()} and then {@link IgniteComponent#stop()} for all components
- * in start-reverse-order. Cleanups node started components map and node status map.
- *
- * @param nodeName Node name.
- * @param startedComponents List of already started components for given node.
- */
- private static void doStopNode(@NotNull String nodeName, @NotNull List<IgniteComponent> startedComponents) {
- ListIterator<IgniteComponent> beforeStopIter =
- startedComponents.listIterator(startedComponents.size() - 1);
-
- while (beforeStopIter.hasPrevious()) {
- IgniteComponent componentToExecuteBeforeNodeStop = beforeStopIter.previous();
-
- try {
- componentToExecuteBeforeNodeStop.beforeNodeStop();
- }
- catch (Exception e) {
- LOG.error("Unable to execute before node stop on the component=[" +
- componentToExecuteBeforeNodeStop + "] within node=[" + nodeName + ']', e);
- }
- }
-
- ListIterator<IgniteComponent> stopIter =
- startedComponents.listIterator(startedComponents.size() - 1);
-
- while (stopIter.hasPrevious()) {
- IgniteComponent componentToStop = stopIter.previous();
-
- try {
- componentToStop.stop();
- }
- catch (Exception e) {
- LOG.error("Unable to stop component=[" + componentToStop + "] within node=[" + nodeName + ']', e);
- }
- }
-
- nodesStartedComponents.remove(nodeName);
-
- nodesStatus.remove(nodeName);
+ return nodeToStart;
}
/** */
@@ -526,21 +157,7 @@ public class IgnitionImpl implements Ignition {
String banner = String.join("\n", BANNER);
LOG.info(() ->
- LoggerMessageHelper.format("{}\n" + " ".repeat(22) + "Apache Ignite ver. {}\n", banner, ver),
+ LoggerMessageHelper.format("{}\n" + " ".repeat(22) + "Apache Ignite ver. {}\n", banner, ver),
null);
}
-
- /**
- * Node state.
- */
- private enum NodeState {
- /** */
- STARTING,
-
- /** */
- STARTED,
-
- /** */
- STOPPING
- }
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index 0b078dc..1718ce1 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -50,7 +50,6 @@ import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
@@ -66,6 +65,7 @@ import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
import org.apache.ignite.table.KeyValueBinaryView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -539,9 +539,16 @@ public class ITDistributedTableTest {
* @return The client cluster view.
*/
private static ClusterService startClient(String name, int port, NodeFinder nodeFinder) {
- var context = new ClusterLocalConfiguration(name, port, nodeFinder, SERIALIZATION_REGISTRY);
- var network = NETWORK_FACTORY.createClusterService(context);
+ var network = ClusterServiceTestUtils.clusterService(
+ name,
+ port,
+ nodeFinder,
+ SERIALIZATION_REGISTRY,
+ NETWORK_FACTORY
+ );
+
network.start();
+
return network;
}