You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/07/22 11:25:25 UTC
[ignite-3] branch main updated: IGNITE-14092 Introduce the Node
Finder API (#238)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0f0630f IGNITE-14092 Introduce the Node Finder API (#238)
0f0630f is described below
commit 0f0630f316b5457918a25f8130e0c05a97c14db9
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Thu Jul 22 14:25:17 2021 +0300
IGNITE-14092 Introduce the Node Finder API (#238)
---
.../ITMetaStorageServicePersistenceTest.java | 12 +++--
.../client/ITMetaStorageServiceTest.java | 34 ++++++-------
.../ignite/network/ClusterLocalConfiguration.java | 17 ++++---
.../java/org/apache/ignite/network/NodeFinder.java | 32 +++++++++++++
modules/network/pom.xml | 5 ++
.../network/scalecube/ITNodeRestartsTest.java | 32 +++++--------
.../scalecube/ITScaleCubeNetworkMessagingTest.java | 23 ++++-----
.../apache/ignite/network/StaticNodeFinder.java | 56 ++++++++++++++++++++++
.../scalecube/ScaleCubeClusterServiceFactory.java | 2 +-
.../ignite/network/LocalPortRangeNodeFinder.java | 49 +++++++++++++++++++
.../ignite/raft/jraft/core/ITCliServiceTest.java | 12 +++--
.../apache/ignite/raft/jraft/core/ITNodeTest.java | 24 +++++-----
.../ignite/raft/server/RaftServerAbstractTest.java | 3 +-
.../apache/ignite/raft/jraft/core/TestCluster.java | 23 ++++-----
.../ignite/raft/jraft/rpc/IgniteRpcTest.java | 6 ++-
.../ignite/raft/jraft/rpc/TestIgniteRpcServer.java | 10 +---
.../apache/ignite/internal/app/IgnitionImpl.java | 10 ++--
.../ignite/distributed/ITDistributedTableTest.java | 29 ++++++-----
18 files changed, 256 insertions(+), 123 deletions(-)
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
index 3783ccc..7cb4cb6 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
@@ -45,6 +45,7 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.Peer;
@@ -343,8 +344,10 @@ public class ITMetaStorageServicePersistenceTest {
/**
* Creates a cluster service.
*/
- private ClusterService clusterService(String name, int port, List<NetworkAddress> servers) {
- var context = new ClusterLocalConfiguration(name, port, servers, SERIALIZATION_REGISTRY);
+ private ClusterService clusterService(String name, int port, NetworkAddress otherPeer) {
+ var nodeFinder = new StaticNodeFinder(List.of(otherPeer));
+
+ var context = new ClusterLocalConfiguration(name, port, nodeFinder, SERIALIZATION_REGISTRY);
var network = NETWORK_FACTORY.createClusterService(context);
@@ -365,7 +368,7 @@ public class ITMetaStorageServicePersistenceTest {
private JRaftServerImpl startServer(int idx, KeyValueStorage storage) {
var addr = new NetworkAddress(getLocalAddress(), PORT);
- ClusterService service = clusterService("server" + idx, PORT + idx, List.of(addr));
+ ClusterService service = clusterService("server" + idx, PORT + idx, addr);
Path jraft = workDir.resolve("jraft" + idx);
@@ -407,8 +410,7 @@ public class ITMetaStorageServicePersistenceTest {
* Starts a client with a specific address.
*/
private RaftGroupService startClient(String groupId, NetworkAddress addr) {
- ClusterService clientNode = clusterService(
- "client_" + groupId + "_", CLIENT_PORT + clients.size(), List.of(addr));
+ ClusterService clientNode = clusterService("client_" + groupId + "_", CLIENT_PORT + clients.size(), addr);
RaftGroupServiceImpl client = new RaftGroupServiceImpl(groupId, clientNode, FACTORY, 10_000,
List.of(new Peer(addr)), false, 200) {
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
index 19a95a1..18c99d2 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
@@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import org.apache.ignite.internal.metastorage.common.OperationType;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
@@ -43,8 +42,10 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.LocalPortRangeNodeFinder;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.Peer;
@@ -58,6 +59,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -163,12 +165,11 @@ public class ITMetaStorageServiceTest {
*/
@BeforeEach
public void beforeTest() {
- List<NetworkAddress> servers = IntStream.range(NODE_PORT_BASE, NODE_PORT_BASE + NODES)
- .mapToObj(port -> new NetworkAddress("localhost", port))
- .collect(Collectors.toList());
+ var nodeFinder = new LocalPortRangeNodeFinder(NODE_PORT_BASE, NODE_PORT_BASE + NODES);
- for (int i = 0; i < NODES; i++)
- cluster.add(startClusterNode("node_" + i, NODE_PORT_BASE + i, servers));
+ nodeFinder.findNodes().stream()
+ .map(addr -> startClusterNode(addr, nodeFinder))
+ .forEach(cluster::add);
for (ClusterService node : cluster)
assertTrue(waitForTopology(node, NODES, 1000));
@@ -328,7 +329,7 @@ public class ITMetaStorageServiceTest {
assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());
List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
- map(ByteArray::bytes).collect(Collectors.toList());
+ map(ByteArray::bytes).collect(toList());
for (int i = 0; i < expKeys.size(); i++)
assertArrayEquals(expKeys.get(i), keys.get(i));
@@ -337,7 +338,7 @@ public class ITMetaStorageServiceTest {
assertEquals(EXPECTED_RESULT_MAP.values().size(), values.size());
List<byte[]> expVals = EXPECTED_RESULT_MAP.values().stream().
- map(Entry::value).collect(Collectors.toList());
+ map(Entry::value).collect(toList());
for (int i = 0; i < expKeys.size(); i++)
assertArrayEquals(expVals.get(i), values.get(i));
@@ -367,7 +368,7 @@ public class ITMetaStorageServiceTest {
assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());
List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
- map(ByteArray::bytes).collect(Collectors.toList());
+ map(ByteArray::bytes).collect(toList());
for (int i = 0; i < expKeys.size(); i++)
assertArrayEquals(expKeys.get(i), keys.get(i));
@@ -376,7 +377,7 @@ public class ITMetaStorageServiceTest {
assertEquals(EXPECTED_RESULT_MAP.values().size(), values.size());
List<byte[]> expVals = EXPECTED_RESULT_MAP.values().stream().
- map(Entry::value).collect(Collectors.toList());
+ map(Entry::value).collect(toList());
for (int i = 0; i < expKeys.size(); i++)
assertArrayEquals(expVals.get(i), values.get(i));
@@ -459,7 +460,7 @@ public class ITMetaStorageServiceTest {
assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());
List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
- map(ByteArray::bytes).collect(Collectors.toList());
+ map(ByteArray::bytes).collect(toList());
for (int i = 0; i < expKeys.size(); i++)
assertArrayEquals(expKeys.get(i), keys.get(i));
@@ -483,7 +484,7 @@ public class ITMetaStorageServiceTest {
assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());
List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
- map(ByteArray::bytes).collect(Collectors.toList());
+ map(ByteArray::bytes).collect(toList());
for (int i = 0; i < expKeys.size(); i++)
assertArrayEquals(expKeys.get(i), keys.get(i));
@@ -963,13 +964,12 @@ public class ITMetaStorageServiceTest {
}
/**
- * @param name Node name.
- * @param port Local port.
- * @param srvs Server nodes of the cluster.
+ * @param addr Node address.
+ * @param nodeFinder Node finder.
* @return The client cluster view.
*/
- private ClusterService startClusterNode(String name, int port, List<NetworkAddress> srvs) {
- var ctx = new ClusterLocalConfiguration(name, port, srvs, SERIALIZATION_REGISTRY);
+ private static ClusterService startClusterNode(NetworkAddress addr, NodeFinder nodeFinder) {
+ var ctx = new ClusterLocalConfiguration(addr.toString(), addr.port(), nodeFinder, SERIALIZATION_REGISTRY);
var net = NETWORK_FACTORY.createClusterService(ctx);
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
index 535eb81..6d107b1 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
@@ -17,7 +17,6 @@
package org.apache.ignite.network;
-import java.util.List;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
/**
@@ -33,8 +32,8 @@ public class ClusterLocalConfiguration {
/** The port. */
private final int port;
- /** Addresses of other nodes. */
- private final List<NetworkAddress> memberAddresses;
+ /** Node finder. */
+ private final NodeFinder nodeFinder;
/** Message mapper providers. */
private final MessageSerializationRegistry serializationRegistry;
@@ -42,15 +41,15 @@ public class ClusterLocalConfiguration {
/**
* @param name Local name.
* @param port Local port.
- * @param memberAddresses Other cluster member addresses.
+ * @param nodeFinder Node finder for discovering the initial cluster members.
* @param serializationRegistry Message serialization registry.
*/
public ClusterLocalConfiguration(
- String name, int port, List<NetworkAddress> memberAddresses, MessageSerializationRegistry serializationRegistry
+ String name, int port, NodeFinder nodeFinder, MessageSerializationRegistry serializationRegistry
) {
this.name = name;
this.port = port;
- this.memberAddresses = List.copyOf(memberAddresses);
+ this.nodeFinder = nodeFinder;
this.serializationRegistry = serializationRegistry;
}
@@ -69,10 +68,10 @@ public class ClusterLocalConfiguration {
}
/**
- * @return Addresses of other nodes.
+ * @return Node finder for discovering the initial cluster members.
*/
- public List<NetworkAddress> getMemberAddresses() {
- return memberAddresses;
+ public NodeFinder getNodeFinder() {
+ return nodeFinder;
}
/**
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NodeFinder.java b/modules/network-api/src/main/java/org/apache/ignite/network/NodeFinder.java
new file mode 100644
index 0000000..b7c2874
--- /dev/null
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/NodeFinder.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.List;
+
+/**
+ * Interface for services responsible for discovering the initial set of network cluster members.
+ */
+public interface NodeFinder {
+ /**
+ * Discovers the initial set of cluster members and returns their network addresses.
+ *
+ * @return addresses of initial cluster members.
+ */
+ List<NetworkAddress> findNodes();
+}
diff --git a/modules/network/pom.xml b/modules/network/pom.xml
index 1c1abe6..c1c62b8 100644
--- a/modules/network/pom.xml
+++ b/modules/network/pom.xml
@@ -35,6 +35,11 @@
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-configuration</artifactId>
</dependency>
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
index 02af912..d09afc8 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
@@ -18,18 +18,19 @@ package org.apache.ignite.network.scalecube;
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.IntStream;
+import java.util.stream.Collectors;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.LocalPortRangeNodeFinder;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
-import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -60,20 +61,11 @@ class ITNodeRestartsTest {
public void testRestarts() {
final int initPort = 3344;
- String addr = "localhost";
- List<NetworkAddress> addresses = IntStream.range(0, 5)
- .mapToObj(i -> new NetworkAddress(addr, (initPort + i)))
- .collect(toList());
+ var nodeFinder = new LocalPortRangeNodeFinder(initPort, initPort + 5);
- services = new ArrayList<>(addresses.size());
-
- for (int i = 0; i < addresses.size(); i++) {
- NetworkAddress address = addresses.get(i);
-
- ClusterService svc = startNetwork(address.toString(), initPort + i, addresses);
-
- services.add(svc);
- }
+ services = nodeFinder.findNodes().stream()
+ .map(addr -> startNetwork(addr, nodeFinder))
+ .collect(Collectors.toCollection(ArrayList::new)); // ensure mutability
for (ClusterService service : services) {
assertTrue(waitForTopology(service, 5, 5_000), service.topologyService().localMember().toString()
@@ -83,6 +75,8 @@ class ITNodeRestartsTest {
int idx0 = 0;
int idx1 = 2;
+ List<NetworkAddress> addresses = nodeFinder.findNodes();
+
LOG.info("Shutdown {}", addresses.get(idx0));
services.get(idx0).shutdown();
@@ -90,11 +84,11 @@ class ITNodeRestartsTest {
services.get(idx1).shutdown();
LOG.info("Starting {}", addresses.get(idx0));
- ClusterService svc0 = startNetwork(addresses.get(idx0).toString(), initPort + idx0, addresses);
+ ClusterService svc0 = startNetwork(addresses.get(idx0), nodeFinder);
services.set(idx0, svc0);
LOG.info("Starting {}", addresses.get(idx1));
- ClusterService svc2 = startNetwork(addresses.get(idx1).toString(), initPort + idx1, addresses);
+ ClusterService svc2 = startNetwork(addresses.get(idx1), nodeFinder);
services.set(idx1, svc2);
for (ClusterService service : services) {
@@ -106,8 +100,8 @@ class ITNodeRestartsTest {
}
/** */
- private ClusterService startNetwork(String name, int port, List<NetworkAddress> addresses) {
- var context = new ClusterLocalConfiguration(name, port, addresses, serializationRegistry);
+ private ClusterService startNetwork(NetworkAddress addr, NodeFinder nodeFinder) {
+ var context = new ClusterLocalConfiguration(addr.toString(), addr.port(), nodeFinder, serializationRegistry);
ClusterService clusterService = networkFactory.createClusterService(context);
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index a2c78da..96d6155 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -26,8 +26,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.transport.api.Transport;
import org.apache.ignite.internal.network.NetworkMessageTypes;
@@ -35,8 +35,10 @@ import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.LocalPortRangeNodeFinder;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.TestMessage;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.TestMessageTypes;
@@ -365,26 +367,25 @@ class ITScaleCubeNetworkMessagingTest {
int initialPort = 3344;
- List<NetworkAddress> addresses = IntStream.range(0, numOfNodes)
- .mapToObj(i -> new NetworkAddress("localhost", initialPort + i))
- .collect(Collectors.toUnmodifiableList());
+ var nodeFinder = new LocalPortRangeNodeFinder(initialPort, initialPort + numOfNodes);
+
+ var isInitial = new AtomicBoolean(true);
- members = IntStream.range(0, numOfNodes)
- .mapToObj(i -> startNode("Node #" + i, initialPort + i, addresses, i == 0))
+ members = nodeFinder.findNodes().stream()
+ .map(addr -> startNode(addr, nodeFinder, isInitial.getAndSet(false)))
.collect(Collectors.toUnmodifiableList());
}
/**
* Start cluster node.
*
- * @param name Node name.
- * @param port Node port.
- * @param addresses Addresses of other nodes.
+ * @param addr Node address.
+ * @param nodeFinder Node finder.
* @param initial Whether this node is the first one.
* @return Started cluster node.
*/
- private ClusterService startNode(String name, int port, List<NetworkAddress> addresses, boolean initial) {
- var context = new ClusterLocalConfiguration(name, port, addresses, serializationRegistry);
+ private ClusterService startNode(NetworkAddress addr, NodeFinder nodeFinder, boolean initial) {
+ var context = new ClusterLocalConfiguration(addr.toString(), addr.port(), nodeFinder, serializationRegistry);
ClusterService clusterService = networkFactory.createClusterService(context);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/StaticNodeFinder.java b/modules/network/src/main/java/org/apache/ignite/network/StaticNodeFinder.java
new file mode 100644
index 0000000..8bb891a
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/StaticNodeFinder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.configuration.schemas.network.NetworkView;
+
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toUnmodifiableList;
+
+/**
+ * {@code NodeFinder} implementation that encapsulates a predefined list of network addresses.
+ */
+public class StaticNodeFinder implements NodeFinder {
+ /** */
+ private final List<NetworkAddress> addresses;
+
+ /**
+ * @param addresses Addresses of initial cluster members.
+ */
+ public StaticNodeFinder(List<NetworkAddress> addresses) {
+ this.addresses = addresses;
+ }
+
+ /**
+ * Creates a node finder extracting the initial cluster member addresses from the given configuration.
+ *
+ * @param networkConfiguration Network configuration.
+ */
+ public static StaticNodeFinder fromConfiguration(NetworkView networkConfiguration) {
+ return Arrays.stream(networkConfiguration.netClusterNodes())
+ .map(NetworkAddress::from)
+ .collect(collectingAndThen(toUnmodifiableList(), StaticNodeFinder::new));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<NetworkAddress> findNodes() {
+ return addresses;
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index be24cac..b029b75 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -86,7 +86,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
})
.config(opts -> opts.memberAlias(consistentId))
.transport(opts -> opts.transportFactory(new DelegatingTransportFactory(messagingService, config -> transport)))
- .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())));
+ .membership(opts -> opts.seedMembers(parseAddresses(context.getNodeFinder().findNodes())));
// resolve cyclic dependencies
messagingService.setCluster(cluster);
diff --git a/modules/network/src/test/java/org/apache/ignite/network/LocalPortRangeNodeFinder.java b/modules/network/src/test/java/org/apache/ignite/network/LocalPortRangeNodeFinder.java
new file mode 100644
index 0000000..86fe58c
--- /dev/null
+++ b/modules/network/src/test/java/org/apache/ignite/network/LocalPortRangeNodeFinder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static java.util.stream.Collectors.toUnmodifiableList;
+
+/**
+ * {@code NodeFinder} implementation similar to the {@link StaticNodeFinder} but encapsulates a list of static local
+ * addresses over a port range.
+ */
+public class LocalPortRangeNodeFinder implements NodeFinder {
+ /** */
+ private final List<NetworkAddress> addresses;
+
+ /**
+ * Creates a node finder that contains local network addresses over the given port range.
+ *
+ * @param startPort Start port (including).
+ * @param endPort End port (excluding).
+ */
+ public LocalPortRangeNodeFinder(int startPort, int endPort) {
+ addresses = IntStream.range(startPort, endPort)
+ .mapToObj(port -> new NetworkAddress("localhost", port))
+ .collect(toUnmodifiableList());
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<NetworkAddress> findNodes() {
+ return addresses;
+ }
+}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
index bc7b202..8295d09 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
@@ -30,13 +30,13 @@ import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
-import java.util.stream.Collectors;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
-import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeFinder;
+import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.CliService;
import org.apache.ignite.raft.jraft.JRaftUtils;
@@ -58,6 +58,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.lang.Thread.sleep;
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -114,14 +116,14 @@ public class ITCliServiceTest {
CliOptions opts = new CliOptions();
opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, "client"));
- List<NetworkAddress> memberAddresses = peers.stream()
+ NodeFinder nodeFinder = peers.stream()
.map(PeerId::getEndpoint)
.map(JRaftUtils::addressFromEndpoint)
- .collect(Collectors.toList());
+ .collect(collectingAndThen(toList(), StaticNodeFinder::new));
var registry = new MessageSerializationRegistryImpl();
- var serviceConfig = new ClusterLocalConfiguration("client", TestUtils.INIT_PORT - 1, memberAddresses, registry);
+ var serviceConfig = new ClusterLocalConfiguration("client", TestUtils.INIT_PORT - 1, nodeFinder, registry);
var factory = new TestScaleCubeClusterServiceFactory();
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
index 0763a16..bf609d2 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
@@ -37,14 +37,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.codahale.metrics.ConsoleReporter;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeFinder;
+import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.Iterator;
@@ -95,6 +95,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -3385,20 +3387,20 @@ public class ITNodeTest {
private RaftGroupService createService(String groupId, PeerId peerId, NodeOptions nodeOptions) {
Configuration initialConf = nodeOptions.getInitialConf();
- var servers = List.<NetworkAddress>of();
+ Stream<PeerId> peers = initialConf == null ?
+ Stream.empty() :
+ Stream.concat(initialConf.getPeers().stream(), initialConf.getLearners().stream());
- if (initialConf != null) {
- servers = Stream.concat(initialConf.getPeers().stream(), initialConf.getLearners().stream())
+ NodeFinder nodeFinder = peers
.map(PeerId::getEndpoint)
.map(JRaftUtils::addressFromEndpoint)
- .collect(Collectors.toList());
- }
+ .collect(collectingAndThen(toList(), StaticNodeFinder::new));
var nodeManager = new NodeManager();
- ClusterService clusterService = createClusterService(peerId.getEndpoint(), servers);
+ ClusterService clusterService = createClusterService(peerId.getEndpoint(), nodeFinder);
- IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager, nodeOptions);
+ IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions);
nodeOptions.setRpcClient(new IgniteRpcClient(clusterService));
@@ -3420,10 +3422,10 @@ public class ITNodeTest {
/**
* Creates a non-started {@link ClusterService}.
*/
- private static ClusterService createClusterService(Endpoint endpoint, List<NetworkAddress> members) {
+ private static ClusterService createClusterService(Endpoint endpoint, NodeFinder nodeFinder) {
var registry = new TestMessageSerializationRegistryImpl();
- var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), members, registry);
+ var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), nodeFinder, registry);
var clusterServiceFactory = new TestScaleCubeClusterServiceFactory();
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
index a34c3ad..7cc02ac 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
@@ -56,7 +57,7 @@ abstract class RaftServerAbstractTest {
* @return The client cluster view.
*/
protected ClusterService clusterService(String name, int port, List<NetworkAddress> servers, boolean start) {
- var context = new ClusterLocalConfiguration(name, port, servers, SERIALIZATION_REGISTRY);
+ var context = new ClusterLocalConfiguration(name, port, new StaticNodeFinder(servers), SERIALIZATION_REGISTRY);
var network = NETWORK_FACTORY.createClusterService(context);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 90d25c0..24e2172 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -31,12 +31,13 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
-import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeFinder;
+import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.JRaftServiceFactory;
@@ -55,6 +56,8 @@ import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.jetbrains.annotations.Nullable;
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -209,22 +212,20 @@ public class TestCluster {
if (!emptyPeers)
nodeOptions.setInitialConf(new Configuration(this.peers, this.learners));
- List<NetworkAddress> servers = emptyPeers ?
- List.of() :
- this.peers.stream()
+ NodeFinder nodeFinder = (emptyPeers ? Stream.<PeerId>empty() : peers.stream())
.map(PeerId::getEndpoint)
.map(JRaftUtils::addressFromEndpoint)
- .collect(Collectors.toList());
+ .collect(collectingAndThen(toList(), StaticNodeFinder::new));
NodeManager nodeManager = new NodeManager();
- ClusterService clusterService = createClusterService(listenAddr, servers);
+ ClusterService clusterService = createClusterService(listenAddr, nodeFinder);
var rpcClient = new IgniteRpcClient(clusterService);
nodeOptions.setRpcClient(rpcClient);
- var rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager, nodeOptions);
+ var rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions);
clusterService.start();
@@ -258,10 +259,10 @@ public class TestCluster {
/**
* Creates a non-started {@link ClusterService}.
*/
- private static ClusterService createClusterService(Endpoint endpoint, List<NetworkAddress> members) {
+ private static ClusterService createClusterService(Endpoint endpoint, NodeFinder nodeFinder) {
var registry = new TestMessageSerializationRegistryImpl();
- var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), members, registry);
+ var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), nodeFinder, registry);
var clusterServiceFactory = new TestScaleCubeClusterServiceFactory();
@@ -435,7 +436,7 @@ public class TestCluster {
this.lock.lock();
try {
return this.nodes.stream().map(node -> node.getNodeId().getPeerId().getEndpoint())
- .collect(Collectors.toList());
+ .collect(toList());
}
finally {
this.lock.unlock();
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
index b1e0133..3827230 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.option.NodeOptions;
@@ -42,7 +43,7 @@ public class IgniteRpcTest extends AbstractRpcTest {
@Override public RpcServer<?> createServer(Endpoint endpoint) {
ClusterService service = createService(endpoint.toString(), endpoint.getPort());
- var server = new TestIgniteRpcServer(service, List.of(), new NodeManager(), new NodeOptions()) {
+ var server = new TestIgniteRpcServer(service, new NodeManager(), new NodeOptions()) {
@Override public void shutdown() {
super.shutdown();
@@ -84,7 +85,8 @@ public class IgniteRpcTest extends AbstractRpcTest {
*/
private static ClusterService createService(String name, int port, NetworkAddress... servers) {
var registry = new MessageSerializationRegistryImpl();
- var context = new ClusterLocalConfiguration(name, port, List.of(servers), registry);
+ var nodeFinder = new StaticNodeFinder(List.of(servers));
+ var context = new ClusterLocalConfiguration(name, port, nodeFinder, registry);
var factory = new TestScaleCubeClusterServiceFactory();
return factory.createClusterService(context);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index a621b39..77846bb 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -17,9 +17,7 @@
package org.apache.ignite.raft.jraft.rpc;
-import java.util.List;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.option.NodeOptions;
@@ -34,16 +32,10 @@ public class TestIgniteRpcServer extends IgniteRpcServer {
/**
* @param clusterService Cluster service.
- * @param servers Server list.
* @param nodeManager Node manager.
* @param nodeOptions Node options.
*/
- public TestIgniteRpcServer(
- ClusterService clusterService,
- List<NetworkAddress> servers,
- NodeManager nodeManager,
- NodeOptions nodeOptions
- ) {
+ public TestIgniteRpcServer(ClusterService clusterService, NodeManager nodeManager, NodeOptions nodeOptions) {
super(
clusterService,
nodeManager,
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index a3face6..ed5a75d 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -27,8 +27,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.stream.Collectors;
-
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.Ignition;
import org.apache.ignite.configuration.RootKey;
@@ -58,7 +56,7 @@ import org.apache.ignite.lang.LoggerMessageHelper;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
-import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
import org.apache.ignite.utils.IgniteProperties;
import org.jetbrains.annotations.NotNull;
@@ -179,16 +177,14 @@ public class IgnitionImpl implements Ignition {
var serializationRegistry = new MessageSerializationRegistryImpl();
- List<NetworkAddress> peers = Arrays.stream(netConfigurationView.netClusterNodes())
- .map(NetworkAddress::from)
- .collect(Collectors.toUnmodifiableList());
+ var nodeFinder = StaticNodeFinder.fromConfiguration(netConfigurationView);
// Network startup.
ClusterService clusterNetSvc = new ScaleCubeClusterServiceFactory().createClusterService(
new ClusterLocalConfiguration(
nodeName,
netConfigurationView.port(),
- peers,
+ nodeFinder,
serializationRegistry
)
);
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index 7280fff..fc6284a 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -26,16 +26,17 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.RendezvousAffinityFunction;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
-import org.apache.ignite.internal.schema.row.Row;
-import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.command.GetCommand;
import org.apache.ignite.internal.table.distributed.command.InsertCommand;
@@ -47,16 +48,15 @@ import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.LocalPortRangeNodeFinder;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
-import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
-import org.apache.ignite.internal.raft.server.RaftServer;
-import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
import org.apache.ignite.table.KeyValueBinaryView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
@@ -113,19 +113,18 @@ public class ITDistributedTableTest {
*/
@BeforeEach
public void beforeTest() {
- List<NetworkAddress> allNodes = IntStream.range(NODE_PORT_BASE, NODE_PORT_BASE + NODES)
- .mapToObj(port -> new NetworkAddress("localhost", port))
- .collect(Collectors.toList());
+ var nodeFinder = new LocalPortRangeNodeFinder(NODE_PORT_BASE, NODE_PORT_BASE + NODES);
- for (int i = 0; i < NODES; i++)
- cluster.add(startClient("node_" + i, NODE_PORT_BASE + i, allNodes));
+ nodeFinder.findNodes().stream()
+ .map(addr -> startClient(addr.toString(), addr.port(), nodeFinder))
+ .forEach(cluster::add);
for (ClusterService node : cluster)
assertTrue(waitForTopology(node, NODES, 1000));
LOG.info("Cluster started.");
- client = startClient("client", NODE_PORT_BASE + NODES, allNodes);
+ client = startClient("client", NODE_PORT_BASE + NODES, nodeFinder);
assertTrue(waitForTopology(client, NODES + 1, 1000));
@@ -501,11 +500,11 @@ public class ITDistributedTableTest {
/**
* @param name Node name.
* @param port Local port.
- * @param servers Server nodes of the cluster.
+ * @param nodeFinder Node finder.
* @return The client cluster view.
*/
- private ClusterService startClient(String name, int port, List<NetworkAddress> servers) {
- var context = new ClusterLocalConfiguration(name, port, servers, SERIALIZATION_REGISTRY);
+ private static ClusterService startClient(String name, int port, NodeFinder nodeFinder) {
+ var context = new ClusterLocalConfiguration(name, port, nodeFinder, SERIALIZATION_REGISTRY);
var network = NETWORK_FACTORY.createClusterService(context);
network.start();
return network;