You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/07/22 10:30:41 UTC
[ignite-3] 01/01: Revert "IGNITE-14092 Introduce the Node Finder
API (#233)"
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch revert-233-ignite-14092
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 72a0982dc0931414d84910638d2b408b1b61ffe1
Author: ibessonov <be...@gmail.com>
AuthorDate: Thu Jul 22 13:30:31 2021 +0300
Revert "IGNITE-14092 Introduce the Node Finder API (#233)"
This reverts commit b6799b2757790a5312d0b1cda3375eacafc2b1a9.
---
.../client/ITMetaStorageServiceTest.java | 34 ++++++-------
.../ignite/network/ClusterLocalConfiguration.java | 17 +++----
.../java/org/apache/ignite/network/NodeFinder.java | 32 -------------
modules/network/pom.xml | 5 --
.../network/scalecube/ITNodeRestartsTest.java | 32 ++++++++-----
.../scalecube/ITScaleCubeNetworkMessagingTest.java | 23 +++++----
.../apache/ignite/network/StaticNodeFinder.java | 56 ----------------------
.../scalecube/ScaleCubeClusterServiceFactory.java | 2 +-
.../ignite/network/LocalPortRangeNodeFinder.java | 49 -------------------
.../ignite/raft/jraft/core/ITCliServiceTest.java | 12 ++---
.../apache/ignite/raft/jraft/core/ITNodeTest.java | 24 +++++-----
.../ignite/raft/server/RaftServerAbstractTest.java | 3 +-
.../apache/ignite/raft/jraft/core/TestCluster.java | 23 +++++----
.../ignite/raft/jraft/rpc/IgniteRpcTest.java | 6 +--
.../ignite/raft/jraft/rpc/TestIgniteRpcServer.java | 10 +++-
.../apache/ignite/internal/app/IgnitionImpl.java | 9 ++--
.../ignite/distributed/ITDistributedTableTest.java | 29 +++++------
17 files changed, 117 insertions(+), 249 deletions(-)
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
index 18c99d2..19a95a1 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.metastorage.common.OperationType;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
@@ -42,10 +43,8 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
-import org.apache.ignite.network.LocalPortRangeNodeFinder;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.Peer;
@@ -59,7 +58,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -165,11 +163,12 @@ public class ITMetaStorageServiceTest {
*/
@BeforeEach
public void beforeTest() {
- var nodeFinder = new LocalPortRangeNodeFinder(NODE_PORT_BASE, NODE_PORT_BASE + NODES);
+ List<NetworkAddress> servers = IntStream.range(NODE_PORT_BASE, NODE_PORT_BASE + NODES)
+ .mapToObj(port -> new NetworkAddress("localhost", port))
+ .collect(Collectors.toList());
- nodeFinder.findNodes().stream()
- .map(addr -> startClusterNode(addr, nodeFinder))
- .forEach(cluster::add);
+ for (int i = 0; i < NODES; i++)
+ cluster.add(startClusterNode("node_" + i, NODE_PORT_BASE + i, servers));
for (ClusterService node : cluster)
assertTrue(waitForTopology(node, NODES, 1000));
@@ -329,7 +328,7 @@ public class ITMetaStorageServiceTest {
assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());
List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
- map(ByteArray::bytes).collect(toList());
+ map(ByteArray::bytes).collect(Collectors.toList());
for (int i = 0; i < expKeys.size(); i++)
assertArrayEquals(expKeys.get(i), keys.get(i));
@@ -338,7 +337,7 @@ public class ITMetaStorageServiceTest {
assertEquals(EXPECTED_RESULT_MAP.values().size(), values.size());
List<byte[]> expVals = EXPECTED_RESULT_MAP.values().stream().
- map(Entry::value).collect(toList());
+ map(Entry::value).collect(Collectors.toList());
for (int i = 0; i < expKeys.size(); i++)
assertArrayEquals(expVals.get(i), values.get(i));
@@ -368,7 +367,7 @@ public class ITMetaStorageServiceTest {
assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());
List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
- map(ByteArray::bytes).collect(toList());
+ map(ByteArray::bytes).collect(Collectors.toList());
for (int i = 0; i < expKeys.size(); i++)
assertArrayEquals(expKeys.get(i), keys.get(i));
@@ -377,7 +376,7 @@ public class ITMetaStorageServiceTest {
assertEquals(EXPECTED_RESULT_MAP.values().size(), values.size());
List<byte[]> expVals = EXPECTED_RESULT_MAP.values().stream().
- map(Entry::value).collect(toList());
+ map(Entry::value).collect(Collectors.toList());
for (int i = 0; i < expKeys.size(); i++)
assertArrayEquals(expVals.get(i), values.get(i));
@@ -460,7 +459,7 @@ public class ITMetaStorageServiceTest {
assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());
List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
- map(ByteArray::bytes).collect(toList());
+ map(ByteArray::bytes).collect(Collectors.toList());
for (int i = 0; i < expKeys.size(); i++)
assertArrayEquals(expKeys.get(i), keys.get(i));
@@ -484,7 +483,7 @@ public class ITMetaStorageServiceTest {
assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());
List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
- map(ByteArray::bytes).collect(toList());
+ map(ByteArray::bytes).collect(Collectors.toList());
for (int i = 0; i < expKeys.size(); i++)
assertArrayEquals(expKeys.get(i), keys.get(i));
@@ -964,12 +963,13 @@ public class ITMetaStorageServiceTest {
}
/**
- * @param addr Node address.
- * @param nodeFinder Node finder.
+ * @param name Node name.
+ * @param port Local port.
+ * @param srvs Server nodes of the cluster.
* @return The client cluster view.
*/
- private static ClusterService startClusterNode(NetworkAddress addr, NodeFinder nodeFinder) {
- var ctx = new ClusterLocalConfiguration(addr.toString(), addr.port(), nodeFinder, SERIALIZATION_REGISTRY);
+ private ClusterService startClusterNode(String name, int port, List<NetworkAddress> srvs) {
+ var ctx = new ClusterLocalConfiguration(name, port, srvs, SERIALIZATION_REGISTRY);
var net = NETWORK_FACTORY.createClusterService(ctx);
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
index 6d107b1..535eb81 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
@@ -17,6 +17,7 @@
package org.apache.ignite.network;
+import java.util.List;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
/**
@@ -32,8 +33,8 @@ public class ClusterLocalConfiguration {
/** The port. */
private final int port;
- /** Node finder. */
- private final NodeFinder nodeFinder;
+ /** Addresses of other nodes. */
+ private final List<NetworkAddress> memberAddresses;
/** Message mapper providers. */
private final MessageSerializationRegistry serializationRegistry;
@@ -41,15 +42,15 @@ public class ClusterLocalConfiguration {
/**
* @param name Local name.
* @param port Local port.
- * @param nodeFinder Node finder for discovering the initial cluster members.
+ * @param memberAddresses Other cluster member addresses.
* @param serializationRegistry Message serialization registry.
*/
public ClusterLocalConfiguration(
- String name, int port, NodeFinder nodeFinder, MessageSerializationRegistry serializationRegistry
+ String name, int port, List<NetworkAddress> memberAddresses, MessageSerializationRegistry serializationRegistry
) {
this.name = name;
this.port = port;
- this.nodeFinder = nodeFinder;
+ this.memberAddresses = List.copyOf(memberAddresses);
this.serializationRegistry = serializationRegistry;
}
@@ -68,10 +69,10 @@ public class ClusterLocalConfiguration {
}
/**
- * @return Node finder for discovering the initial cluster members.
+ * @return Addresses of other nodes.
*/
- public NodeFinder getNodeFinder() {
- return nodeFinder;
+ public List<NetworkAddress> getMemberAddresses() {
+ return memberAddresses;
}
/**
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NodeFinder.java b/modules/network-api/src/main/java/org/apache/ignite/network/NodeFinder.java
deleted file mode 100644
index b7c2874..0000000
--- a/modules/network-api/src/main/java/org/apache/ignite/network/NodeFinder.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.network;
-
-import java.util.List;
-
-/**
- * Interface for services responsible for discovering the initial set of network cluster members.
- */
-public interface NodeFinder {
- /**
- * Discovers the initial set of cluster members and returns their network addresses.
- *
- * @return addresses of initial cluster members.
- */
- List<NetworkAddress> findNodes();
-}
diff --git a/modules/network/pom.xml b/modules/network/pom.xml
index c1c62b8..1c1abe6 100644
--- a/modules/network/pom.xml
+++ b/modules/network/pom.xml
@@ -35,11 +35,6 @@
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
- <artifactId>ignite-api</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.ignite</groupId>
<artifactId>ignite-configuration</artifactId>
</dependency>
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
index d09afc8..02af912 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
@@ -18,19 +18,18 @@ package org.apache.ignite.network.scalecube;
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
-import org.apache.ignite.network.LocalPortRangeNodeFinder;
import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -61,11 +60,20 @@ class ITNodeRestartsTest {
public void testRestarts() {
final int initPort = 3344;
- var nodeFinder = new LocalPortRangeNodeFinder(initPort, initPort + 5);
+ String addr = "localhost";
+ List<NetworkAddress> addresses = IntStream.range(0, 5)
+ .mapToObj(i -> new NetworkAddress(addr, (initPort + i)))
+ .collect(toList());
- services = nodeFinder.findNodes().stream()
- .map(addr -> startNetwork(addr, nodeFinder))
- .collect(Collectors.toCollection(ArrayList::new)); // ensure mutability
+ services = new ArrayList<>(addresses.size());
+
+ for (int i = 0; i < addresses.size(); i++) {
+ NetworkAddress address = addresses.get(i);
+
+ ClusterService svc = startNetwork(address.toString(), initPort + i, addresses);
+
+ services.add(svc);
+ }
for (ClusterService service : services) {
assertTrue(waitForTopology(service, 5, 5_000), service.topologyService().localMember().toString()
@@ -75,8 +83,6 @@ class ITNodeRestartsTest {
int idx0 = 0;
int idx1 = 2;
- List<NetworkAddress> addresses = nodeFinder.findNodes();
-
LOG.info("Shutdown {}", addresses.get(idx0));
services.get(idx0).shutdown();
@@ -84,11 +90,11 @@ class ITNodeRestartsTest {
services.get(idx1).shutdown();
LOG.info("Starting {}", addresses.get(idx0));
- ClusterService svc0 = startNetwork(addresses.get(idx0), nodeFinder);
+ ClusterService svc0 = startNetwork(addresses.get(idx0).toString(), initPort + idx0, addresses);
services.set(idx0, svc0);
LOG.info("Starting {}", addresses.get(idx1));
- ClusterService svc2 = startNetwork(addresses.get(idx1), nodeFinder);
+ ClusterService svc2 = startNetwork(addresses.get(idx1).toString(), initPort + idx1, addresses);
services.set(idx1, svc2);
for (ClusterService service : services) {
@@ -100,8 +106,8 @@ class ITNodeRestartsTest {
}
/** */
- private ClusterService startNetwork(NetworkAddress addr, NodeFinder nodeFinder) {
- var context = new ClusterLocalConfiguration(addr.toString(), addr.port(), nodeFinder, serializationRegistry);
+ private ClusterService startNetwork(String name, int port, List<NetworkAddress> addresses) {
+ var context = new ClusterLocalConfiguration(name, port, addresses, serializationRegistry);
ClusterService clusterService = networkFactory.createClusterService(context);
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index 96d6155..a2c78da 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -26,8 +26,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.transport.api.Transport;
import org.apache.ignite.internal.network.NetworkMessageTypes;
@@ -35,10 +35,8 @@ import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
-import org.apache.ignite.network.LocalPortRangeNodeFinder;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.TestMessage;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.TestMessageTypes;
@@ -367,25 +365,26 @@ class ITScaleCubeNetworkMessagingTest {
int initialPort = 3344;
- var nodeFinder = new LocalPortRangeNodeFinder(initialPort, initialPort + numOfNodes);
-
- var isInitial = new AtomicBoolean(true);
+ List<NetworkAddress> addresses = IntStream.range(0, numOfNodes)
+ .mapToObj(i -> new NetworkAddress("localhost", initialPort + i))
+ .collect(Collectors.toUnmodifiableList());
- members = nodeFinder.findNodes().stream()
- .map(addr -> startNode(addr, nodeFinder, isInitial.getAndSet(false)))
+ members = IntStream.range(0, numOfNodes)
+ .mapToObj(i -> startNode("Node #" + i, initialPort + i, addresses, i == 0))
.collect(Collectors.toUnmodifiableList());
}
/**
* Start cluster node.
*
- * @param addr Node address.
- * @param nodeFinder Node finder.
+ * @param name Node name.
+ * @param port Node port.
+ * @param addresses Addresses of other nodes.
* @param initial Whether this node is the first one.
* @return Started cluster node.
*/
- private ClusterService startNode(NetworkAddress addr, NodeFinder nodeFinder, boolean initial) {
- var context = new ClusterLocalConfiguration(addr.toString(), addr.port(), nodeFinder, serializationRegistry);
+ private ClusterService startNode(String name, int port, List<NetworkAddress> addresses, boolean initial) {
+ var context = new ClusterLocalConfiguration(name, port, addresses, serializationRegistry);
ClusterService clusterService = networkFactory.createClusterService(context);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/StaticNodeFinder.java b/modules/network/src/main/java/org/apache/ignite/network/StaticNodeFinder.java
deleted file mode 100644
index 8bb891a..0000000
--- a/modules/network/src/main/java/org/apache/ignite/network/StaticNodeFinder.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.network;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.ignite.configuration.schemas.network.NetworkView;
-
-import static java.util.stream.Collectors.collectingAndThen;
-import static java.util.stream.Collectors.toUnmodifiableList;
-
-/**
- * {@code NodeFinder} implementation that encapsulates a predefined list of network addresses.
- */
-public class StaticNodeFinder implements NodeFinder {
- /** */
- private final List<NetworkAddress> addresses;
-
- /**
- * @param addresses Addresses of initial cluster members.
- */
- public StaticNodeFinder(List<NetworkAddress> addresses) {
- this.addresses = addresses;
- }
-
- /**
- * Creates a node finder extracting the initial cluster member addresses from the given configuration.
- *
- * @param networkConfiguration Network configuration.
- */
- public static StaticNodeFinder fromConfiguration(NetworkView networkConfiguration) {
- return Arrays.stream(networkConfiguration.netClusterNodes())
- .map(NetworkAddress::from)
- .collect(collectingAndThen(toUnmodifiableList(), StaticNodeFinder::new));
- }
-
- /** {@inheritDoc} */
- @Override public List<NetworkAddress> findNodes() {
- return addresses;
- }
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index b029b75..be24cac 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -86,7 +86,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
})
.config(opts -> opts.memberAlias(consistentId))
.transport(opts -> opts.transportFactory(new DelegatingTransportFactory(messagingService, config -> transport)))
- .membership(opts -> opts.seedMembers(parseAddresses(context.getNodeFinder().findNodes())));
+ .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())));
// resolve cyclic dependencies
messagingService.setCluster(cluster);
diff --git a/modules/network/src/test/java/org/apache/ignite/network/LocalPortRangeNodeFinder.java b/modules/network/src/test/java/org/apache/ignite/network/LocalPortRangeNodeFinder.java
deleted file mode 100644
index 86fe58c..0000000
--- a/modules/network/src/test/java/org/apache/ignite/network/LocalPortRangeNodeFinder.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.network;
-
-import java.util.List;
-import java.util.stream.IntStream;
-
-import static java.util.stream.Collectors.toUnmodifiableList;
-
-/**
- * {@code NodeFinder} implementation similar to the {@link StaticNodeFinder} but encapsulates a list of static local
- * addresses over a port range.
- */
-public class LocalPortRangeNodeFinder implements NodeFinder {
- /** */
- private final List<NetworkAddress> addresses;
-
- /**
- * Creates a node finder that contains local network addresses over the given port range.
- *
- * @param startPort Start port (including).
- * @param endPort End port (excluding).
- */
- public LocalPortRangeNodeFinder(int startPort, int endPort) {
- addresses = IntStream.range(startPort, endPort)
- .mapToObj(port -> new NetworkAddress("localhost", port))
- .collect(toUnmodifiableList());
- }
-
- /** {@inheritDoc} */
- @Override public List<NetworkAddress> findNodes() {
- return addresses;
- }
-}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
index 8295d09..bc7b202 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
@@ -30,13 +30,13 @@ import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
-import org.apache.ignite.network.NodeFinder;
-import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.CliService;
import org.apache.ignite.raft.jraft.JRaftUtils;
@@ -58,8 +58,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.lang.Thread.sleep;
-import static java.util.stream.Collectors.collectingAndThen;
-import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -116,14 +114,14 @@ public class ITCliServiceTest {
CliOptions opts = new CliOptions();
opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, "client"));
- NodeFinder nodeFinder = peers.stream()
+ List<NetworkAddress> memberAddresses = peers.stream()
.map(PeerId::getEndpoint)
.map(JRaftUtils::addressFromEndpoint)
- .collect(collectingAndThen(toList(), StaticNodeFinder::new));
+ .collect(Collectors.toList());
var registry = new MessageSerializationRegistryImpl();
- var serviceConfig = new ClusterLocalConfiguration("client", TestUtils.INIT_PORT - 1, nodeFinder, registry);
+ var serviceConfig = new ClusterLocalConfiguration("client", TestUtils.INIT_PORT - 1, memberAddresses, registry);
var factory = new TestScaleCubeClusterServiceFactory();
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
index bf609d2..0763a16 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
@@ -37,14 +37,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.codahale.metrics.ConsoleReporter;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NodeFinder;
-import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.Iterator;
@@ -95,8 +95,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.util.stream.Collectors.collectingAndThen;
-import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -3387,20 +3385,20 @@ public class ITNodeTest {
private RaftGroupService createService(String groupId, PeerId peerId, NodeOptions nodeOptions) {
Configuration initialConf = nodeOptions.getInitialConf();
- Stream<PeerId> peers = initialConf == null ?
- Stream.empty() :
- Stream.concat(initialConf.getPeers().stream(), initialConf.getLearners().stream());
+ var servers = List.<NetworkAddress>of();
- NodeFinder nodeFinder = peers
+ if (initialConf != null) {
+ servers = Stream.concat(initialConf.getPeers().stream(), initialConf.getLearners().stream())
.map(PeerId::getEndpoint)
.map(JRaftUtils::addressFromEndpoint)
- .collect(collectingAndThen(toList(), StaticNodeFinder::new));
+ .collect(Collectors.toList());
+ }
var nodeManager = new NodeManager();
- ClusterService clusterService = createClusterService(peerId.getEndpoint(), nodeFinder);
+ ClusterService clusterService = createClusterService(peerId.getEndpoint(), servers);
- IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions);
+ IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager, nodeOptions);
nodeOptions.setRpcClient(new IgniteRpcClient(clusterService));
@@ -3422,10 +3420,10 @@ public class ITNodeTest {
/**
* Creates a non-started {@link ClusterService}.
*/
- private static ClusterService createClusterService(Endpoint endpoint, NodeFinder nodeFinder) {
+ private static ClusterService createClusterService(Endpoint endpoint, List<NetworkAddress> members) {
var registry = new TestMessageSerializationRegistryImpl();
- var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), nodeFinder, registry);
+ var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), members, registry);
var clusterServiceFactory = new TestScaleCubeClusterServiceFactory();
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
index 7cc02ac..a34c3ad 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
@@ -24,7 +24,6 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
@@ -57,7 +56,7 @@ abstract class RaftServerAbstractTest {
* @return The client cluster view.
*/
protected ClusterService clusterService(String name, int port, List<NetworkAddress> servers, boolean start) {
- var context = new ClusterLocalConfiguration(name, port, new StaticNodeFinder(servers), SERIALIZATION_REGISTRY);
+ var context = new ClusterLocalConfiguration(name, port, servers, SERIALIZATION_REGISTRY);
var network = NETWORK_FACTORY.createClusterService(context);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 24e2172..90d25c0 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -31,13 +31,12 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
-import java.util.stream.Stream;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NodeFinder;
-import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.JRaftServiceFactory;
@@ -56,8 +55,6 @@ import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.jetbrains.annotations.Nullable;
-import static java.util.stream.Collectors.collectingAndThen;
-import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -212,20 +209,22 @@ public class TestCluster {
if (!emptyPeers)
nodeOptions.setInitialConf(new Configuration(this.peers, this.learners));
- NodeFinder nodeFinder = (emptyPeers ? Stream.<PeerId>empty() : peers.stream())
+ List<NetworkAddress> servers = emptyPeers ?
+ List.of() :
+ this.peers.stream()
.map(PeerId::getEndpoint)
.map(JRaftUtils::addressFromEndpoint)
- .collect(collectingAndThen(toList(), StaticNodeFinder::new));
+ .collect(Collectors.toList());
NodeManager nodeManager = new NodeManager();
- ClusterService clusterService = createClusterService(listenAddr, nodeFinder);
+ ClusterService clusterService = createClusterService(listenAddr, servers);
var rpcClient = new IgniteRpcClient(clusterService);
nodeOptions.setRpcClient(rpcClient);
- var rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions);
+ var rpcServer = new TestIgniteRpcServer(clusterService, servers, nodeManager, nodeOptions);
clusterService.start();
@@ -259,10 +258,10 @@ public class TestCluster {
/**
* Creates a non-started {@link ClusterService}.
*/
- private static ClusterService createClusterService(Endpoint endpoint, NodeFinder nodeFinder) {
+ private static ClusterService createClusterService(Endpoint endpoint, List<NetworkAddress> members) {
var registry = new TestMessageSerializationRegistryImpl();
- var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), nodeFinder, registry);
+ var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), members, registry);
var clusterServiceFactory = new TestScaleCubeClusterServiceFactory();
@@ -436,7 +435,7 @@ public class TestCluster {
this.lock.lock();
try {
return this.nodes.stream().map(node -> node.getNodeId().getPeerId().getEndpoint())
- .collect(toList());
+ .collect(Collectors.toList());
}
finally {
this.lock.unlock();
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
index 3827230..b1e0133 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
@@ -23,7 +23,6 @@ import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.option.NodeOptions;
@@ -43,7 +42,7 @@ public class IgniteRpcTest extends AbstractRpcTest {
@Override public RpcServer<?> createServer(Endpoint endpoint) {
ClusterService service = createService(endpoint.toString(), endpoint.getPort());
- var server = new TestIgniteRpcServer(service, new NodeManager(), new NodeOptions()) {
+ var server = new TestIgniteRpcServer(service, List.of(), new NodeManager(), new NodeOptions()) {
@Override public void shutdown() {
super.shutdown();
@@ -85,8 +84,7 @@ public class IgniteRpcTest extends AbstractRpcTest {
*/
private static ClusterService createService(String name, int port, NetworkAddress... servers) {
var registry = new MessageSerializationRegistryImpl();
- var nodeFinder = new StaticNodeFinder(List.of(servers));
- var context = new ClusterLocalConfiguration(name, port, nodeFinder, registry);
+ var context = new ClusterLocalConfiguration(name, port, List.of(servers), registry);
var factory = new TestScaleCubeClusterServiceFactory();
return factory.createClusterService(context);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index 77846bb..a621b39 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -17,7 +17,9 @@
package org.apache.ignite.raft.jraft.rpc;
+import java.util.List;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.option.NodeOptions;
@@ -32,10 +34,16 @@ public class TestIgniteRpcServer extends IgniteRpcServer {
/**
* @param clusterService Cluster service.
+ * @param servers Server list.
* @param nodeManager Node manager.
* @param nodeOptions Node options.
*/
- public TestIgniteRpcServer(ClusterService clusterService, NodeManager nodeManager, NodeOptions nodeOptions) {
+ public TestIgniteRpcServer(
+ ClusterService clusterService,
+ List<NetworkAddress> servers,
+ NodeManager nodeManager,
+ NodeOptions nodeOptions
+ ) {
super(
clusterService,
nodeManager,
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index 9052fe6..bf6f300 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.Ignition;
import org.apache.ignite.configuration.RootKey;
@@ -55,7 +56,7 @@ import org.apache.ignite.lang.LoggerMessageHelper;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
-import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.utils.IgniteProperties;
@@ -177,14 +178,16 @@ public class IgnitionImpl implements Ignition {
var serializationRegistry = new MessageSerializationRegistryImpl();
- var nodeFinder = StaticNodeFinder.fromConfiguration(netConfigurationView);
+ List<NetworkAddress> peers = Arrays.stream(netConfigurationView.netClusterNodes())
+ .map(NetworkAddress::from)
+ .collect(Collectors.toUnmodifiableList());
// Network startup.
ClusterService clusterNetSvc = new ScaleCubeClusterServiceFactory().createClusterService(
new ClusterLocalConfiguration(
nodeName,
netConfigurationView.port(),
- nodeFinder,
+ peers,
serializationRegistry
)
);
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index fc6284a..7280fff 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -26,17 +26,16 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.RendezvousAffinityFunction;
-import org.apache.ignite.internal.raft.server.RaftServer;
-import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.command.GetCommand;
import org.apache.ignite.internal.table.distributed.command.InsertCommand;
@@ -48,15 +47,16 @@ import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
-import org.apache.ignite.network.LocalPortRangeNodeFinder;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
-import org.apache.ignite.network.NodeFinder;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
import org.apache.ignite.table.KeyValueBinaryView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
@@ -113,18 +113,19 @@ public class ITDistributedTableTest {
*/
@BeforeEach
public void beforeTest() {
- var nodeFinder = new LocalPortRangeNodeFinder(NODE_PORT_BASE, NODE_PORT_BASE + NODES);
+ List<NetworkAddress> allNodes = IntStream.range(NODE_PORT_BASE, NODE_PORT_BASE + NODES)
+ .mapToObj(port -> new NetworkAddress("localhost", port))
+ .collect(Collectors.toList());
- nodeFinder.findNodes().stream()
- .map(addr -> startClient(addr.toString(), addr.port(), nodeFinder))
- .forEach(cluster::add);
+ for (int i = 0; i < NODES; i++)
+ cluster.add(startClient("node_" + i, NODE_PORT_BASE + i, allNodes));
for (ClusterService node : cluster)
assertTrue(waitForTopology(node, NODES, 1000));
LOG.info("Cluster started.");
- client = startClient("client", NODE_PORT_BASE + NODES, nodeFinder);
+ client = startClient("client", NODE_PORT_BASE + NODES, allNodes);
assertTrue(waitForTopology(client, NODES + 1, 1000));
@@ -500,11 +501,11 @@ public class ITDistributedTableTest {
/**
* @param name Node name.
* @param port Local port.
- * @param nodeFinder Node finder.
+ * @param servers Server nodes of the cluster.
* @return The client cluster view.
*/
- private static ClusterService startClient(String name, int port, NodeFinder nodeFinder) {
- var context = new ClusterLocalConfiguration(name, port, nodeFinder, SERIALIZATION_REGISTRY);
+ private ClusterService startClient(String name, int port, List<NetworkAddress> servers) {
+ var context = new ClusterLocalConfiguration(name, port, servers, SERIALIZATION_REGISTRY);
var network = NETWORK_FACTORY.createClusterService(context);
network.start();
return network;